1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import os
from typing import Mapping, TYPE_CHECKING
import psycopg2
if TYPE_CHECKING:
from .base import Version
def connect():
return psycopg2.connect(os.environ['DATABASE_URL'])
def poke_table(force=False):
with connect() as conn:
with conn.cursor() as cur:
if force:
cur.execute('DROP TABLE IF EXISTS versions CASCADE;')
cur.execute('CREATE TABLE IF NOT EXISTS versions ('
'repo text,'
'package_name text,'
'clean_version text,'
'orig_version text,'
'PRIMARY KEY (repo, package_name)'
');')
def write(repo: str, data: Mapping[str, 'Version']):
with connect() as conn:
with conn.cursor() as cur:
insert = 'INSERT INTO versions (repo, package_name, clean_version, orig_version)' \
'VALUES (%(repo)s, %(package_name)s, %(clean_version)s, %(orig_version)s)' \
'ON CONFLICT (repo, package_name) DO UPDATE SET ' \
'clean_version = EXCLUDED.clean_version,' \
'orig_version = EXCLUDED.orig_version'
for package, version in data.items():
cur.execute(insert, {'repo': repo, 'package_name': package, 'clean_version': version.clean, 'orig_version': version.original})
def read(repo: str, package: str):
with connect() as conn:
with conn.cursor() as cur:
sql = 'SELECT clean_version, orig_version FROM versions ' \
'WHERE repo = %(repo)s AND package_name = %(package_name)s'
cur.execute(sql, {'repo': repo, 'package_name': package})
result = cur.fetchone()
if result is None:
return None
clean, orig = result
return {'clean': clean, 'original': orig}
|