1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
|
import asyncio
import dataclasses
import subprocess
import sys
from collections import defaultdict
from typing import Literal, Optional
from collections.abc import Sequence, Iterable, Mapping
from debian.deb822 import Deb822
from debian.debian_support import Version
AptCacheState = Literal[
"not-loaded",
"loading",
"loaded",
"failed",
"tooling-not-available",
"empty-cache",
]
@dataclasses.dataclass(slots=True)
class PackageInformation:
name: str
architecture: str
version: Version
multi_arch: str
# suites: Sequence[Tuple[str, ...]]
synopsis: str
section: str
provides: str | None
upstream_homepage: str | None
@dataclasses.dataclass(slots=True, frozen=True)
class PackageLookup:
name: str
package: PackageInformation | None
provided_by: Sequence[PackageInformation]
class AptCache:
def __init__(self) -> None:
self._state: AptCacheState = "not-loaded"
self._load_error: str | None = None
self._lookups: Mapping[str, PackageLookup] = {}
@property
def state(self) -> AptCacheState:
return self._state
@property
def load_error(self) -> str | None:
return self._load_error
def lookup(self, name: str) -> PackageLookup | None:
return self._lookups.get(name)
async def load(self) -> None:
if self._state in ("loading", "loaded"):
raise RuntimeError(f"Already {self._state}")
self._load_error = None
self._state = "loading"
try:
files_raw = subprocess.check_output(
[
"apt-get",
"indextargets",
"--format",
"$(IDENTIFIER)\x1f$(FILENAME)",
]
).decode("utf-8")
except FileNotFoundError:
self._state = "tooling-not-available"
self._load_error = "apt-get not available in PATH"
return
except subprocess.CalledProcessError as e:
self._state = "failed"
self._load_error = f"apt-get exited with {e.returncode}"
return
packages = {}
for raw_file_line in files_raw.split("\n"):
if not raw_file_line or raw_file_line.isspace():
continue
identifier, filename = raw_file_line.split("\x1f")
if identifier not in ("Packages",):
continue
try:
for package_info in parse_apt_file(filename):
# Let other computations happen if needed.
await asyncio.sleep(0)
existing = packages.get(package_info.name)
if existing and package_info.version < existing.version:
continue
packages[package_info.name] = package_info
except FileNotFoundError:
self._state = "tooling-not-available"
self._load_error = "/usr/lib/apt/apt-helper not available"
return
except (AttributeError, RuntimeError, IndexError) as e:
self._state = "failed"
self._load_error = str(e)
return
provides = defaultdict(list)
for package_info in packages.values():
if not package_info.provides:
continue
# Some packages (`debhelper`) provides the same package multiple times (`debhelper-compat`).
# Normalize that into one.
deps = {
clause.split("(")[0].strip()
for clause in package_info.provides.split(",")
}
for dep in sorted(deps):
provides[dep].append(package_info)
self._lookups = {
name: PackageLookup(
name,
packages.get(name),
tuple(provides.get(name, [])),
)
for name in packages.keys() | provides.keys()
}
self._state = "loaded"
def parse_apt_file(filename: str) -> Iterable[PackageInformation]:
proc = subprocess.Popen(
["/usr/lib/apt/apt-helper", "cat-file", filename],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
)
with proc:
for stanza in Deb822.iter_paragraphs(proc.stdout):
pkg_info = stanza_to_package_info(stanza)
if pkg_info is not None:
yield pkg_info
def stanza_to_package_info(stanza: Deb822) -> PackageInformation | None:
try:
name = stanza["Package"]
architecture = sys.intern(stanza["Architecture"])
version = Version(stanza["Version"])
multi_arch = sys.intern(stanza.get("Multi-Arch", "no"))
synopsis = stanza["Description"]
section = sys.intern(stanza["Section"])
provides = stanza.get("Provides")
homepage = stanza.get("Homepage")
except KeyError:
return None
if "\n" in synopsis:
# "Modern" Packages files do not have the full description. But in case we see a (very old one)
# have consistent behavior with the modern ones.
synopsis = synopsis.split("\n")[0]
return PackageInformation(
name,
architecture,
version,
multi_arch,
synopsis,
section,
provides,
homepage,
)
|