1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
from dataclasses import dataclass, asdict as dataclass_asdict
import datetime
from functools import total_ordering
import gzip
import json
import os
from pathlib import Path
import re
from typing import Any, Callable, Mapping
import requests
import semver
__all__ = [
'Repository',
'Version',
]
HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT'
SLUGIFY = re.compile(r'\W+')
def slug(text: str) -> str:
return SLUGIFY.sub('-', text.lower()).strip('-')
@total_ordering
@dataclass()
class Version:
original: str
clean: str
def __str__(self) -> str:
return self.original
def __lt__(self, other: Any):
if not isinstance(other, Version):
return NotImplemented
if semver.VersionInfo.isvalid(self.clean) and semver.VersionInfo.isvalid(other.clean):
return semver.compare(self.clean, other.clean) < 0
return self.original < other.original
class JSONEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, Version):
return dataclass_asdict(o)
return super().default(o)
class JSONDecoder(json.JSONDecoder):
@staticmethod
def object_hook(o: dict) -> Any:
if o.keys() == {'original', 'clean'}:
return Version(**o)
return o
def __init__(self):
super().__init__(object_hook=self.object_hook)
@dataclass()
class Repository:
family: str
repo: str
index_url: str
parse: Callable[[Path], Mapping[str, Version]]
def _full_name(self):
return f'{self.family} {self.repo}'
def _cache_dir(self) -> Path:
return Path('data') / slug(self.family) / slug(self.repo)
def _cache_file(self, name: str) -> Path:
return self._cache_dir() / name
def get_versions(self) -> Mapping[str, Version]:
self._cache_dir().mkdir(parents=True, exist_ok=True)
downloaded_file = self._cache_file('downloaded')
if downloaded_file.exists():
mtime = downloaded_file.stat().st_mtime
else:
mtime = 0
mtime = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc)
mtime = mtime.strftime(HTTP_DATE)
parsed_file = self._cache_file('parsed.json.gz')
response = requests.get(self.index_url, headers={
'If-Modified-Since': mtime,
}, stream=True)
if response.status_code != requests.codes.not_modified:
response.raise_for_status()
print('Re-downloading', self._full_name())
set_mtime = response.headers.get('Last-Modified', '')
with downloaded_file.open('wb') as f:
for chunk in response.iter_content(chunk_size=256):
f.write(chunk)
if len(set_mtime) > 0:
set_mtime = datetime.datetime.strptime(set_mtime, HTTP_DATE)
os.utime(downloaded_file, (datetime.datetime.now().timestamp(), set_mtime.timestamp()))
if response.status_code != requests.codes.not_modified or not parsed_file.exists():
parsed_data = self.parse(downloaded_file)
with gzip.open(parsed_file, 'wt') as f:
json.dump(parsed_data, f, cls=JSONEncoder)
return parsed_data
else:
with gzip.open(parsed_file, 'rt') as f:
return json.load(f, cls=JSONDecoder)
|