From bbf01ada6b2f748618db820624bf768989a11924 Mon Sep 17 00:00:00 2001 From: Melody Horn Date: Fri, 30 Apr 2021 18:52:36 -0600 Subject: fetch folders --- ctec/__main__.py | 47 ++++++++- ctec/imap_response.py | 46 +++++++++ ctec/logic.py | 25 +++-- ctec/parse_utils.py | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 368 insertions(+), 10 deletions(-) create mode 100644 ctec/imap_response.py create mode 100644 ctec/parse_utils.py (limited to 'ctec') diff --git a/ctec/__main__.py b/ctec/__main__.py index 8d01960..84f1215 100644 --- a/ctec/__main__.py +++ b/ctec/__main__.py @@ -1,3 +1,4 @@ +import re import threading from tkinter import * from tkinter import messagebox @@ -7,6 +8,9 @@ from . import VERSION from .config import get_config from .logic import Account +def slugify(text: str): + return re.sub(r'\W', '-', text) + class CtecFrame: def __init__(self, root: Tk): self.root = root @@ -14,6 +18,8 @@ class CtecFrame: mainframe = ttk.Frame(root, padding="3 3 12 12") mainframe.grid(column=0, row=0, sticky=(N, W, E, S)) + mainframe.columnconfigure(0, weight=1) + mainframe.rowconfigure(0, weight=1) root.columnconfigure(0, weight=1) root.rowconfigure(0, weight=1) @@ -27,16 +33,49 @@ class CtecFrame: self.accounts_tree.grid(column=0, row=0, sticky=(N, W, E, S)) self.accounts_tree.insert('', 'end', 'all', text='All Accounts') for account in self.accounts: - unread_count = sum(1 for message in account.inbox() if 'S' not in message.get_flags()) - all_count = sum(1 for message in account.inbox()) - self.accounts_tree.insert('', 'end', account.address, text=account.address, values=(unread_count, all_count)) - threading.Thread(target=lambda: account.fetch_inbox()).start() + self.accounts_tree.insert('', 'end', account.address, text=account.address, open=True) + for folder_name, folder in account.folders(): + unread_count = sum(1 for message in folder if 'S' not in message.get_flags()) + all_count = sum(1 for message in folder) + tree_item = f'{account.address}-{slugify(folder_name)}' + parent_tree_item = account.address + if '.' in folder_name: + parent_folder_name, child_folder_name = folder_name.rsplit('.', 1) + parent_tree_item = f'{account.address}-{slugify(parent_folder_name)}' + folder_name = child_folder_name + self.accounts_tree.insert(parent_tree_item, 'end', tree_item, text=folder_name, values=(unread_count, all_count), open=True) + # this argument is here to work around an obnoxious python misfeature + # local variables aren't closed over unless the function is returned?? + # so setting a default argument value lets us close over the value instead of just using the most recent value in that name + def fetch_account_folders(account=account): + account.fetch_folders() + self.update_accounts_tree(updated_account=account) + threading.Thread(target=fetch_account_folders).start() # self.messages = # create a menu bar self.make_menu_bar(root) + def update_accounts_tree(self, updated_account=None): + selected_accounts = [updated_account] + if updated_account is None: + selected_accounts = self.accounts + for account in selected_accounts: + for folder_name, folder in account.folders(): + unread_count = sum(1 for message in folder if 'S' not in message.get_flags()) + all_count = sum(1 for message in folder) + tree_item = f'{account.address}-{slugify(folder_name)}' + if self.accounts_tree.exists(tree_item): + self.accounts_tree.item(tree_item, values=(unread_count, all_count)) + else: + parent_tree_item = account.address + if '.' in folder_name: + parent_folder_name, child_folder_name = folder_name.rsplit('.', 1) + parent_tree_item = f'{account.address}-{slugify(parent_folder_name)}' + folder_name = child_folder_name + self.accounts_tree.insert(parent_tree_item, 'end', tree_item, text=folder_name, values=(unread_count, all_count), open=True) + def make_menu_bar(self, root): root.option_add('*tearOff', FALSE) diff --git a/ctec/imap_response.py b/ctec/imap_response.py new file mode 100644 index 0000000..23ca943 --- /dev/null +++ b/ctec/imap_response.py @@ -0,0 +1,46 @@ +# this should really be in the stdlib imo +from dataclasses import dataclass +from typing import List as ListT, Union, ClassVar + +from .parse_utils import ParseResult, Parser, take_while1, tag, delimited, take_n, alt, map_parser, separated_many0, separated_triple, all_consuming + +__all__ = [ + 'List', +] + +atom: Parser[str] = take_while1(lambda c: c.isalnum() or c in r'\/') + +number: Parser[int] = map_parser(take_while1(str.isnumeric), int) + +def literal_string(text: str) -> ParseResult[str]: + delimited_result = delimited(tag('{'), number, tag('}\r\n'))(text) + if delimited_result is None: + return None + count, text = delimited_result + return take_n(count)(text) + +quoted_string: Parser[str] = delimited(tag('"'), take_while1(lambda c: c not in '\r\n"'), tag('"')) + +string: Parser[str] = alt(literal_string, quoted_string) + +astring: Parser[str] = alt(atom, string) + +data_item: Parser[Union[int, str]] = alt(number, atom, string) + +ParensList = ListT[Union[int, str, 'ParensList']] +def parens_list(text: str) -> ParseResult[ParensList]: + return delimited(tag('('), separated_many0(alt(data_item, parens_list), tag(' ')), tag(')'))(text) + +@dataclass +class List: + attributes: ListT[str] + delimiter: str + name: str + + @staticmethod + def parse(response: bytes) -> 'List': + response = response.decode('ASCII') + print(response) + parser = all_consuming(separated_triple(parens_list, tag(' '), string, tag(' '), astring), debug=True) + (attributes, delimiter, name), _ = parser(response) + return List(attributes, delimiter, name) diff --git a/ctec/logic.py b/ctec/logic.py index 7afd78a..57b553c 100644 --- a/ctec/logic.py +++ b/ctec/logic.py @@ -3,13 +3,17 @@ import mailbox import pathlib import pprint import re +from typing import List, Union, Tuple -from . import import_or_install +from . import import_or_install, imap_response mailbox.Maildir.colon = '!' FLAGS = re.compile(rb'FLAGS \(([^)]+?)\)') +def clean_folder_name(folder_name: str, separator: str) -> str: + return folder_name.replace('.', '-').replace(separator, '.').replace('/', '-') + class Account: def __init__(self, address: str, info: dict): appdirs = import_or_install('appdirs') @@ -18,21 +22,27 @@ class Account: self.mailbox = mailbox.Maildir(data_dir / address) self.info = info + def fetch_folders(self): + with imaplib.IMAP4_SSL(self.info['imap host']) as M: + M.login(self.address, self.info['password']) + folder_list: List[bytes] + typ, folder_list = M.list() + for folder in folder_list: + folder_info = imap_response.List.parse(folder) + self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) + def fetch_inbox(self): with imaplib.IMAP4_SSL(self.info['imap host']) as M: M.login(self.address, self.info['password']) - try: - inbox = self.mailbox.get_folder('Inbox') - except mailbox.NoSuchMailboxError: - inbox = self.mailbox.add_folder('Inbox') + inbox = self.mailbox.add_folder('Inbox') M.select() typ, data = M.search(None, 'ALL') for num in data[0].split(): typ, data = M.fetch(num, '(FLAGS RFC822)') for prefix, message in data[:-1]: flags = FLAGS.search(prefix).group(1).split() - print(flags) message = mailbox.MaildirMessage(message) + print(message['Subject'], flags) if rb'\Seen' in flags: message.add_flag('S') message.set_subdir('cur') @@ -45,5 +55,8 @@ class Account: inbox.add(message) M.close() + def folders(self) -> List[Tuple[str, mailbox.Maildir]]: + return [(folder, self.mailbox.get_folder(folder)) for folder in self.mailbox.list_folders()] + def inbox(self) -> mailbox.Maildir: return self.mailbox.add_folder('Inbox') diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py new file mode 100644 index 0000000..3dd5e63 --- /dev/null +++ b/ctec/parse_utils.py @@ -0,0 +1,260 @@ +from typing import Callable, List, Optional, Tuple, TypeVar + +__all__ = [ + 'ParseResult', + 'Parser', + 'alt', + 'tag', + 'take_till1', + 'take_while1', + 'take_n', + 'any_char', + 'all_consuming', + 'map_parser', + 'opt', + 'verify', + 'many0', + 'many1', + 'delimited', + 'pair', + 'preceded', + 'separated_pair', + 'separated_triple', + 'separated_many0', +] + +T = TypeVar('T') +T1 = TypeVar('T1') +T2 = TypeVar('T2') +T3 = TypeVar('T3') + +ParseResult = Optional[Tuple[T, str]] +Parser = Callable[[str], ParseResult[T]] + +def alt(*parsers: Parser[T]) -> Parser[T]: + def parse(text: str) -> ParseResult[T]: + for parser in parsers[:-1]: + result = parser(text) + if result is not None: + return result + return parsers[-1](text) + return parse + +def tag(tag_text: str) -> Parser[None]: + def parse(text: str) -> ParseResult[None]: + if text.startswith(tag_text): + return None, text[len(tag_text):] + return None + return parse + +def take_while1(predicate: Callable[[str], bool]) -> Parser[str]: + def parse(text: str) -> ParseResult[str]: + if len(text) == 0 or not predicate(text[0]): + return None + for i in range(1, len(text)): + if not predicate(text[i]): + return text[:i], text[i:] + return text, "" + return parse + +def take_till1(predicate: Callable[[str], bool]) -> Parser[str]: + return take_while1(lambda x: not predicate(x)) + +def take_n(n: int) -> Parser[str]: + def parse(text: str) -> ParseResult[str]: + if len(text) < n: + return None + return text[:n], text[n:] + return parse + +def any_char(text: str) -> ParseResult[str]: + if len(text) > 0: + return text[0], text[1:] + return None + +def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]: + def parse(text: str) -> ParseResult[T]: + parsed_result = parser(text) + if parsed_result is None: + if debug: + print('all_consuming: parser failed') + return None + result, extra = parsed_result + if len(extra) > 0: + if debug: + print('all_consuming: leftover text {}', repr(extra)) + return None + return result, '' + return parse + +def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]: + def parse(text: str) -> ParseResult[T2]: + parsed_result = parser(text) + if parsed_result is None: + return None + result, extra = parsed_result + return mapper(result), extra + return parse + +def opt(parser: Parser[T]) -> Parser[Optional[T]]: + def parse(text: str) -> ParseResult[Optional[T]]: + result = parser(text) + if result is None: + return None, text + return result + return parse + +def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]: + def parse(text: str) -> ParseResult[T]: + parsed_result = parser(text) + if parsed_result is None: + return None + result, extra = parsed_result + if predicate(result): + return result, extra + return None + return parse + +def many0(parser: Parser[T]) -> Parser[List[T]]: + def parse(text: str) -> ParseResult[List[T]]: + result = [] + parser_result = parser(text) + while parser_result is not None: + this_result, text = parser_result + result.append(this_result) + parser_result = parser(text) + return result, text + return parse + +def many1(parser: Parser[T]) -> Parser[List[T]]: + def parse(text: str) -> ParseResult[List[T]]: + parser_result = parser(text) + if parser_result is None: + return None + this_result, extra = parser_result + result = [this_result] + + parser_result = parser(extra) + while parser_result is not None: + this_result, extra = parser_result + result.append(this_result) + parser_result = parser(extra) + return result, extra + return parse + +def separated_many0(parser: Parser[T], separator_parser: Parser) -> Parser[List[T]]: + def parse(text: str) -> ParseResult[List[T]]: + result = [] + while True: + parser_result = parser(text) + if parser_result is None: + break + this_result, text = parser_result + result.append(this_result) + + separator_result = separator_parser(text) + if separator_result is None: + break + _, text = separator_result + return result, text + return parse + +def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser[T2]) -> Parser[T]: + def parse(text: str) -> ParseResult[T]: + before_result = before_parser(text) + if before_result is None: + return None + _, extra = before_result + + parsed_result = parser(extra) + if parsed_result is None: + return None + result, extra = parsed_result + + after_result = after_parser(extra) + if after_result is None: + return None + _, extra = after_result + + return result, extra + return parse + +def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]: + def parse(text: str) -> ParseResult[Tuple[T1, T2]]: + first_parsed_result = first_parser(text) + if first_parsed_result is None: + return None + first_result, extra = first_parsed_result + + second_parsed_result = second_parser(extra) + if second_parsed_result is None: + return None + second_result, extra = second_parsed_result + + return (first_result, second_result), extra + return parse + +def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]: + def parse(text: str) -> ParseResult[T]: + before_result = before_parser(text) + if before_result is None: + return None + _, extra = before_result + + parsed_result = parser(extra) + if parsed_result is None: + return None + result, extra = parsed_result + + return result, extra + return parse + +def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]: + def parse(text: str) -> ParseResult[Tuple[T1, T2]]: + first_parsed_result = first_parser(text) + if first_parsed_result is None: + return None + first_result, extra = first_parsed_result + + between_result = between_parser(extra) + if between_result is None: + return None + _, extra = between_result + + second_parsed_result = second_parser(extra) + if second_parsed_result is None: + return None + second_result, extra = second_parsed_result + + return (first_result, second_result), extra + return parse + +def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_parser: Parser[T2], between23_parser: Parser, third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]: + def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]: + first_parsed_result = first_parser(text) + if first_parsed_result is None: + return None + first_result, extra = first_parsed_result + + between_result = between12_parser(extra) + if between_result is None: + return None + _, extra = between_result + + second_parsed_result = second_parser(extra) + if second_parsed_result is None: + return None + second_result, extra = second_parsed_result + + between_result = between23_parser(extra) + if between_result is None: + return None + _, extra = between_result + + third_parsed_result = third_parser(extra) + if third_parsed_result is None: + return None + third_result, extra = third_parsed_result + + return (first_result, second_result, third_result), extra + return parse -- cgit v1.2.3