From 7fb9136bf70951a3da3acfedc3d5cff12e7dc12c Mon Sep 17 00:00:00 2001 From: Melody Horn Date: Mon, 29 Mar 2021 15:55:43 -0600 Subject: throw together a very rough draft --- repos/__init__.py | 28 +++++++++++++++++++++ repos/alpine_linux.py | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++ repos/base.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) create mode 100644 repos/__init__.py create mode 100644 repos/alpine_linux.py create mode 100644 repos/base.py (limited to 'repos') diff --git a/repos/__init__.py b/repos/__init__.py new file mode 100644 index 0000000..0f71dd1 --- /dev/null +++ b/repos/__init__.py @@ -0,0 +1,28 @@ +from typing import Mapping, List, Any + +from . import alpine_linux +from .base import Repository + +__all__ = [ + 'get_versions', +] + +def repos_from(module): + for exported in module.__all__: + attr = getattr(module, exported) + if isinstance(attr, Repository): + yield attr + +all_repos: List[Repository] = [ + *repos_from(alpine_linux), +] + +def get_versions(package: str) -> Mapping[str, Mapping[str, str]]: + result = dict() + for repo in all_repos: + repo_versions = repo.get_versions() + if package in repo_versions: + if repo.family not in result: + result[repo.family] = dict() + result[repo.family][repo.repo] = repo_versions[package] + return result diff --git a/repos/alpine_linux.py b/repos/alpine_linux.py new file mode 100644 index 0000000..64f159a --- /dev/null +++ b/repos/alpine_linux.py @@ -0,0 +1,68 @@ +from io import TextIOWrapper +from pathlib import Path +import tarfile +from typing import Mapping, TextIO + +from .base import Repository + +__all__ = [ + 'stable_main_x86_64', + 'stable_community_x86_64', + 'edge_main_x86_64', + 'edge_community_x86_64', + 'edge_testing_x86_64', +] + +def parse_apkindex(apkindex: TextIO) -> Mapping[str, str]: + result = dict() + current_package = None + current_version = None + ignore_lines = ['C', 'A', 'S', 'I', 'T', 'U', 'L', 'o', 'm', 't', 'c', 'D', 'p', 'i', 'k'] + for line in apkindex: + line = line.strip() + if len(line) == 0: + if current_package is not None and current_version is not None: + result[current_package] = current_version + current_package = None + current_version = None + continue + try: + line_type, line_data = line.split(':', 1) + except ValueError: + print('what uhhhh the fuck', line, line.split(':', 1)) + continue + if line_type == 'C': + # TODO figure out what this means + pass + elif line_type == 'P': + current_package = line_data + elif line_type == 'V': + current_version = line_data + elif line_type in ignore_lines: + pass + else: + raise ValueError('unknown line type: ' + line_type + ' in line ' + repr(line)) + return result + +def parse_cached(cached: Path) -> Mapping[str, str]: + apkindex = tarfile.open(cached) + for archive_member in apkindex.getmembers(): + if archive_member.name == 'APKINDEX': + apkindex_file = apkindex.extractfile(archive_member) + apkindex_file = TextIOWrapper(apkindex_file) + return parse_apkindex(apkindex_file) + +def build_repo(name: str, url_path: str): + url = f'http://dl-cdn.alpinelinux.org/alpine/{url_path}/APKINDEX.tar.gz' + return Repository( + family='Alpine Linux', + repo=name, + index_url=url, + parse=parse_cached, + ) + +stable_main_x86_64 = build_repo('Stable (main/x86_64)', 'latest-stable/main/x86_64') +stable_community_x86_64 = build_repo('Stable (community/x86_64)', 'latest-stable/community/x86_64') +edge_main_x86_64 = build_repo('Edge (main/x86_64)', 'edge/main/x86_64') +edge_community_x86_64 = build_repo('Edge (community/x86_64)', 'edge/community/x86_64') +edge_testing_x86_64 = build_repo('Edge (testing/x86_64)', 'edge/testing/x86_64') diff --git a/repos/base.py b/repos/base.py new file mode 100644 index 0000000..66ecf2d --- /dev/null +++ b/repos/base.py @@ -0,0 +1,66 @@ +from dataclasses import dataclass +import datetime +import gzip +import json +import os +from pathlib import Path +import re +from typing import Callable, Mapping + +import requests + +HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT' +SLUGIFY = re.compile('\W+') + +def slug(text: str) -> str: + return SLUGIFY.sub('-', text.lower()).strip('-') + +@dataclass() +class Repository: + family: str + repo: str + index_url: str + parse: Callable[[Path], Mapping[str, str]] + + def _full_name(self): + return f'{self.family} {self.repo}' + + def _cache_dir(self) -> Path: + return Path('data') / slug(self.family) / slug(self.repo) + + def _cache_file(self, name: str) -> Path: + return self._cache_dir() / name + + def get_versions(self) -> Mapping[str, str]: + self._cache_dir().mkdir(parents=True, exist_ok=True) + downloaded_file = self._cache_file('downloaded') + if downloaded_file.exists(): + mtime = downloaded_file.stat().st_mtime + else: + mtime = 0 + mtime = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc) + mtime = mtime.strftime(HTTP_DATE) + + parsed_file = self._cache_file('parsed.json.gz') + + response = requests.get(self.index_url, headers={ + 'If-Modified-Since': mtime, + }, stream=True) + if response.status_code != requests.codes.not_modified: + response.raise_for_status() + print('Re-downloading', self._full_name()) + set_mtime = response.headers.get('Last-Modified', '') + with downloaded_file.open('wb') as f: + for chunk in response.iter_content(chunk_size=256): + f.write(chunk) + if len(set_mtime) > 0: + set_mtime = datetime.datetime.strptime(set_mtime, HTTP_DATE) + os.utime(downloaded_file, (datetime.datetime.now().timestamp(), set_mtime.timestamp())) + + parsed_data = self.parse(downloaded_file) + with gzip.open(parsed_file, 'wt') as f: + json.dump(parsed_data, f) + return parsed_data + else: + with gzip.open(parsed_file, 'rt') as f: + return json.load(f) -- cgit v1.2.3