from dataclasses import dataclass, asdict as dataclass_asdict from functools import total_ordering import json from pathlib import Path import re from typing import Any, Callable, Mapping, Optional import requests import semver from . import db __all__ = [ 'Repository', 'slug', 'Version', ] HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT' SLUGIFY = re.compile(r'\W+') def slug(text: Optional[str]) -> str: if text is None: return '' else: return SLUGIFY.sub('-', text.lower()).strip('-') @total_ordering @dataclass() class Version: original: str clean: str def __str__(self) -> str: return self.original def __lt__(self, other: Any): if not isinstance(other, Version): return NotImplemented if semver.VersionInfo.isvalid(self.clean) and semver.VersionInfo.isvalid(other.clean): return semver.compare(self.clean, other.clean) < 0 return self.original < other.original class JSONEncoder(json.JSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, Version): return dataclass_asdict(o) return super().default(o) class JSONDecoder(json.JSONDecoder): @staticmethod def object_hook(o: dict) -> Any: if o.keys() == {'original', 'clean'}: return Version(**o) return o def __init__(self): super().__init__(object_hook=self.object_hook) @dataclass(frozen=True) class Repository: family: Optional[str] repo: str section: Optional[str] index_url: str parse: Callable[[Path], Mapping[str, Version]] def full_name(self): prefix = '' if self.family is not None: prefix = self.family + ' ' suffix = '' if self.section is not None: suffix = ' ' + self.section return f'{prefix}{self.repo}{suffix}' @property def full_repo_name(self): prefix = '' if self.family is not None: prefix = self.family + ' ' return f'{prefix}{self.repo}' def _cache_dir(self) -> Path: result = Path('data') if self.family is not None: result = result / slug(self.family) result = result / slug(self.repo) if self.section is not None: result = result / slug(self.section) return result def _cache_file(self, name: str) -> Path: return self._cache_dir() / name def update(self): self._cache_dir().mkdir(parents=True, exist_ok=True) headers = dict() downloaded_file = self._cache_file('downloaded') mtime_file = self._cache_file('last-modified') if mtime_file.exists(): mtime = mtime_file.read_text() headers['If-Modified-Since'] = mtime etag_file = self._cache_file('etag') if etag_file.exists(): etag = etag_file.read_text() headers['If-None-Match'] = etag response = requests.get(self.index_url, headers=headers, stream=True) if response.status_code != requests.codes.not_modified: response.raise_for_status() print('Re-downloading', self.full_name()) with downloaded_file.open('wb') as f: for chunk in response.iter_content(chunk_size=256): f.write(chunk) if 'Last-Modified' in response.headers: set_mtime = response.headers['Last-Modified'] mtime_file.write_text(set_mtime) if 'ETag' in response.headers: set_etag = response.headers['ETag'] etag_file.write_text(set_etag) parsed_data = self.parse(downloaded_file) db.write(self.full_name(), parsed_data) def get_version(self, package_name: str) -> Optional[Version]: db_result = db.read(self.full_name(), package_name) if db_result is None: return None return Version(**db_result)