from dataclasses import dataclass, asdict as dataclass_asdict import datetime from functools import total_ordering import gzip import json import os from pathlib import Path import re from typing import Any, Callable, Mapping import requests import semver __all__ = [ 'Repository', 'Version', ] HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT' SLUGIFY = re.compile(r'\W+') def slug(text: str) -> str: return SLUGIFY.sub('-', text.lower()).strip('-') @total_ordering @dataclass() class Version: original: str clean: str def __str__(self) -> str: return self.original def __lt__(self, other: Any): if not isinstance(other, Version): return NotImplemented if semver.VersionInfo.isvalid(self.clean) and semver.VersionInfo.isvalid(other.clean): return semver.compare(self.clean, other.clean) < 0 return self.original < other.original class JSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, Version): return dataclass_asdict(o) return super().default(o) class JSONDecoder(json.JSONDecoder): @staticmethod def object_hook(o: dict) -> Any: if o.keys() == {'original', 'clean'}: return Version(**o) return o def __init__(self): super().__init__(object_hook=self.object_hook) @dataclass() class Repository: family: str repo: str index_url: str parse: Callable[[Path], Mapping[str, Version]] def _full_name(self): return f'{self.family} {self.repo}' def _cache_dir(self) -> Path: return Path('data') / slug(self.family) / slug(self.repo) def _cache_file(self, name: str) -> Path: return self._cache_dir() / name def get_versions(self) -> Mapping[str, Version]: self._cache_dir().mkdir(parents=True, exist_ok=True) downloaded_file = self._cache_file('downloaded') if downloaded_file.exists(): mtime = downloaded_file.stat().st_mtime else: mtime = 0 mtime = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc) mtime = mtime.strftime(HTTP_DATE) parsed_file = self._cache_file('parsed.json.gz') response = requests.get(self.index_url, headers={ 'If-Modified-Since': mtime, }, stream=True) if response.status_code != requests.codes.not_modified: response.raise_for_status() print('Re-downloading', self._full_name()) set_mtime = response.headers.get('Last-Modified', '') with downloaded_file.open('wb') as f: for chunk in response.iter_content(chunk_size=256): f.write(chunk) if len(set_mtime) > 0: set_mtime = datetime.datetime.strptime(set_mtime, HTTP_DATE) os.utime(downloaded_file, (datetime.datetime.now().timestamp(), set_mtime.timestamp())) if response.status_code != requests.codes.not_modified or not parsed_file.exists(): parsed_data = self.parse(downloaded_file) with gzip.open(parsed_file, 'wt') as f: json.dump(parsed_data, f, cls=JSONEncoder) return parsed_data else: with gzip.open(parsed_file, 'rt') as f: return json.load(f, cls=JSONDecoder)