1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
from dataclasses import dataclass
import datetime
import gzip
import json
import os
from pathlib import Path
import re
from typing import Callable, Mapping
import requests
HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT'
SLUGIFY = re.compile('\W+')
def slug(text: str) -> str:
return SLUGIFY.sub('-', text.lower()).strip('-')
@dataclass()
class Repository:
family: str
repo: str
index_url: str
parse: Callable[[Path], Mapping[str, str]]
def _full_name(self):
return f'{self.family} {self.repo}'
def _cache_dir(self) -> Path:
return Path('data') / slug(self.family) / slug(self.repo)
def _cache_file(self, name: str) -> Path:
return self._cache_dir() / name
def get_versions(self) -> Mapping[str, str]:
self._cache_dir().mkdir(parents=True, exist_ok=True)
downloaded_file = self._cache_file('downloaded')
if downloaded_file.exists():
mtime = downloaded_file.stat().st_mtime
else:
mtime = 0
mtime = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc)
mtime = mtime.strftime(HTTP_DATE)
parsed_file = self._cache_file('parsed.json.gz')
response = requests.get(self.index_url, headers={
'If-Modified-Since': mtime,
}, stream=True)
if response.status_code != requests.codes.not_modified:
response.raise_for_status()
print('Re-downloading', self._full_name())
set_mtime = response.headers.get('Last-Modified', '')
with downloaded_file.open('wb') as f:
for chunk in response.iter_content(chunk_size=256):
f.write(chunk)
if len(set_mtime) > 0:
set_mtime = datetime.datetime.strptime(set_mtime, HTTP_DATE)
os.utime(downloaded_file, (datetime.datetime.now().timestamp(), set_mtime.timestamp()))
parsed_data = self.parse(downloaded_file)
with gzip.open(parsed_file, 'wt') as f:
json.dump(parsed_data, f)
return parsed_data
else:
with gzip.open(parsed_file, 'rt') as f:
return json.load(f)
|