aboutsummaryrefslogtreecommitdiff
path: root/repos/base.py
blob: 220a30fcead0cc8b9603376c3f32aaf7d350dba2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from dataclasses import dataclass, asdict as dataclass_asdict
import datetime
from functools import total_ordering
import gzip
import json
import os
from pathlib import Path
import re
from typing import Any, Callable, Mapping

import requests
import semver

__all__ = [
    'Repository',
    'Version',
]

HTTP_DATE = '%a, %d %b %Y %H:%M:%S GMT'
SLUGIFY = re.compile(r'\W+')

def slug(text: str) -> str:
    return SLUGIFY.sub('-', text.lower()).strip('-')

@total_ordering
@dataclass()
class Version:
    original: str
    clean: str

    def __str__(self) -> str:
        return self.original

    def __lt__(self, other: Any):
        if not isinstance(other, Version):
            return NotImplemented
        if semver.VersionInfo.isvalid(self.clean) and semver.VersionInfo.isvalid(other.clean):
            return semver.compare(self.clean, other.clean) < 0
        return self.original < other.original

class JSONEncoder(json.JSONEncoder):
    def default(self, o: Any) -> Any:
        if isinstance(o, Version):
            return dataclass_asdict(o)
        return super().default(o)

class JSONDecoder(json.JSONDecoder):
    @staticmethod
    def object_hook(o: dict) -> Any:
        if o.keys() == {'original', 'clean'}:
            return Version(**o)
        return o

    def __init__(self):
        super().__init__(object_hook=self.object_hook)


@dataclass()
class Repository:
    family: str
    repo: str
    index_url: str
    parse: Callable[[Path], Mapping[str, Version]]

    def _full_name(self):
        return f'{self.family} {self.repo}'

    def _cache_dir(self) -> Path:
        return Path('data') / slug(self.family) / slug(self.repo)

    def _cache_file(self, name: str) -> Path:
        return self._cache_dir() / name

    def get_versions(self) -> Mapping[str, Version]:
        self._cache_dir().mkdir(parents=True, exist_ok=True)
        downloaded_file = self._cache_file('downloaded')
        if downloaded_file.exists():
            mtime = downloaded_file.stat().st_mtime
        else:
            mtime = 0
        mtime = datetime.datetime.fromtimestamp(mtime, datetime.timezone.utc)
        mtime = mtime.strftime(HTTP_DATE)

        parsed_file = self._cache_file('parsed.json.gz')

        response = requests.get(self.index_url, headers={
            'If-Modified-Since': mtime,
        }, stream=True)
        if response.status_code != requests.codes.not_modified:
            response.raise_for_status()
            print('Re-downloading', self._full_name())
            set_mtime = response.headers.get('Last-Modified', '')
            with downloaded_file.open('wb') as f:
                for chunk in response.iter_content(chunk_size=256):
                    f.write(chunk)
            if len(set_mtime) > 0:
                set_mtime = datetime.datetime.strptime(set_mtime, HTTP_DATE)
                os.utime(downloaded_file, (datetime.datetime.now().timestamp(), set_mtime.timestamp()))

        if response.status_code != requests.codes.not_modified or not parsed_file.exists():
            parsed_data = self.parse(downloaded_file)
            with gzip.open(parsed_file, 'wt') as f:
                json.dump(parsed_data, f, cls=JSONEncoder)
            return parsed_data
        else:
            with gzip.open(parsed_file, 'rt') as f:
                return json.load(f, cls=JSONDecoder)