import os from typing import Mapping, TYPE_CHECKING import psycopg2 if TYPE_CHECKING: from .base import Version def connect(): return psycopg2.connect(os.environ['DATABASE_URL']) def poke_table(force=False): with connect() as conn: with conn.cursor() as cur: if force: cur.execute('DROP TABLE IF EXISTS versions CASCADE;') cur.execute('CREATE TABLE IF NOT EXISTS versions (' 'repo text,' 'package_name text,' 'clean_version text,' 'orig_version text,' 'PRIMARY KEY (repo, package_name)' ');') def write(repo: str, data: Mapping[str, 'Version']): with connect() as conn: with conn.cursor() as cur: insert = 'INSERT INTO versions (repo, package_name, clean_version, orig_version)' \ 'VALUES (%(repo)s, %(package_name)s, %(clean_version)s, %(orig_version)s)' \ 'ON CONFLICT (repo, package_name) DO UPDATE SET ' \ 'clean_version = EXCLUDED.clean_version,' \ 'orig_version = EXCLUDED.orig_version' for package, version in data.items(): cur.execute(insert, {'repo': repo, 'package_name': package, 'clean_version': version.clean, 'orig_version': version.original}) def read(repo: str, package: str): with connect() as conn: with conn.cursor() as cur: sql = 'SELECT clean_version, orig_version FROM versions ' \ 'WHERE repo = %(repo)s AND package_name = %(package_name)s' cur.execute(sql, {'repo': repo, 'package_name': package}) result = cur.fetchone() if result is None: return None clean, orig = result return {'clean': clean, 'original': orig}