aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMelody Horn <melody@boringcactus.com>2021-04-30 18:52:36 -0600
committerMelody Horn <melody@boringcactus.com>2021-04-30 18:52:36 -0600
commitbbf01ada6b2f748618db820624bf768989a11924 (patch)
tree6223527748664f52128adf824055fe3f3d384c9f
parent1cf5e68c25c0505194f23d4018e29767d3b519dd (diff)
downloadctec-bbf01ada6b2f748618db820624bf768989a11924.tar.gz
ctec-bbf01ada6b2f748618db820624bf768989a11924.zip
fetch folders
-rw-r--r--ctec/__main__.py47
-rw-r--r--ctec/imap_response.py46
-rw-r--r--ctec/logic.py25
-rw-r--r--ctec/parse_utils.py260
4 files changed, 368 insertions, 10 deletions
diff --git a/ctec/__main__.py b/ctec/__main__.py
index 8d01960..84f1215 100644
--- a/ctec/__main__.py
+++ b/ctec/__main__.py
@@ -1,3 +1,4 @@
+import re
import threading
from tkinter import *
from tkinter import messagebox
@@ -7,6 +8,9 @@ from . import VERSION
from .config import get_config
from .logic import Account
+def slugify(text: str):
+ return re.sub(r'\W', '-', text)
+
class CtecFrame:
def __init__(self, root: Tk):
self.root = root
@@ -14,6 +18,8 @@ class CtecFrame:
mainframe = ttk.Frame(root, padding="3 3 12 12")
mainframe.grid(column=0, row=0, sticky=(N, W, E, S))
+ mainframe.columnconfigure(0, weight=1)
+ mainframe.rowconfigure(0, weight=1)
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
@@ -27,16 +33,49 @@ class CtecFrame:
self.accounts_tree.grid(column=0, row=0, sticky=(N, W, E, S))
self.accounts_tree.insert('', 'end', 'all', text='All Accounts')
for account in self.accounts:
- unread_count = sum(1 for message in account.inbox() if 'S' not in message.get_flags())
- all_count = sum(1 for message in account.inbox())
- self.accounts_tree.insert('', 'end', account.address, text=account.address, values=(unread_count, all_count))
- threading.Thread(target=lambda: account.fetch_inbox()).start()
+ self.accounts_tree.insert('', 'end', account.address, text=account.address, open=True)
+ for folder_name, folder in account.folders():
+ unread_count = sum(1 for message in folder if 'S' not in message.get_flags())
+ all_count = sum(1 for message in folder)
+ tree_item = f'{account.address}-{slugify(folder_name)}'
+ parent_tree_item = account.address
+ if '.' in folder_name:
+ parent_folder_name, child_folder_name = folder_name.rsplit('.', 1)
+ parent_tree_item = f'{account.address}-{slugify(parent_folder_name)}'
+ folder_name = child_folder_name
+ self.accounts_tree.insert(parent_tree_item, 'end', tree_item, text=folder_name, values=(unread_count, all_count), open=True)
+ # this argument is here to work around an obnoxious python misfeature
+ # local variables aren't closed over unless the function is returned??
+ # so setting a default argument value lets us close over the value instead of just using the most recent value in that name
+ def fetch_account_folders(account=account):
+ account.fetch_folders()
+ self.update_accounts_tree(updated_account=account)
+ threading.Thread(target=fetch_account_folders).start()
# self.messages =
# create a menu bar
self.make_menu_bar(root)
+ def update_accounts_tree(self, updated_account=None):
+ selected_accounts = [updated_account]
+ if updated_account is None:
+ selected_accounts = self.accounts
+ for account in selected_accounts:
+ for folder_name, folder in account.folders():
+ unread_count = sum(1 for message in folder if 'S' not in message.get_flags())
+ all_count = sum(1 for message in folder)
+ tree_item = f'{account.address}-{slugify(folder_name)}'
+ if self.accounts_tree.exists(tree_item):
+ self.accounts_tree.item(tree_item, values=(unread_count, all_count))
+ else:
+ parent_tree_item = account.address
+ if '.' in folder_name:
+ parent_folder_name, child_folder_name = folder_name.rsplit('.', 1)
+ parent_tree_item = f'{account.address}-{slugify(parent_folder_name)}'
+ folder_name = child_folder_name
+ self.accounts_tree.insert(parent_tree_item, 'end', tree_item, text=folder_name, values=(unread_count, all_count), open=True)
+
def make_menu_bar(self, root):
root.option_add('*tearOff', FALSE)
diff --git a/ctec/imap_response.py b/ctec/imap_response.py
new file mode 100644
index 0000000..23ca943
--- /dev/null
+++ b/ctec/imap_response.py
@@ -0,0 +1,46 @@
+# this should really be in the stdlib imo
+from dataclasses import dataclass
+from typing import List as ListT, Union, ClassVar
+
+from .parse_utils import ParseResult, Parser, take_while1, tag, delimited, take_n, alt, map_parser, separated_many0, separated_triple, all_consuming
+
+__all__ = [
+ 'List',
+]
+
+atom: Parser[str] = take_while1(lambda c: c.isalnum() or c in r'\/')
+
+number: Parser[int] = map_parser(take_while1(str.isnumeric), int)
+
+def literal_string(text: str) -> ParseResult[str]:
+ delimited_result = delimited(tag('{'), number, tag('}\r\n'))(text)
+ if delimited_result is None:
+ return None
+ count, text = delimited_result
+ return take_n(count)(text)
+
+quoted_string: Parser[str] = delimited(tag('"'), take_while1(lambda c: c not in '\r\n"'), tag('"'))
+
+string: Parser[str] = alt(literal_string, quoted_string)
+
+astring: Parser[str] = alt(atom, string)
+
+data_item: Parser[Union[int, str]] = alt(number, atom, string)
+
+ParensList = ListT[Union[int, str, 'ParensList']]
+def parens_list(text: str) -> ParseResult[ParensList]:
+ return delimited(tag('('), separated_many0(alt(data_item, parens_list), tag(' ')), tag(')'))(text)
+
+@dataclass
+class List:
+ attributes: ListT[str]
+ delimiter: str
+ name: str
+
+ @staticmethod
+ def parse(response: bytes) -> 'List':
+ response = response.decode('ASCII')
+ print(response)
+ parser = all_consuming(separated_triple(parens_list, tag(' '), string, tag(' '), astring), debug=True)
+ (attributes, delimiter, name), _ = parser(response)
+ return List(attributes, delimiter, name)
diff --git a/ctec/logic.py b/ctec/logic.py
index 7afd78a..57b553c 100644
--- a/ctec/logic.py
+++ b/ctec/logic.py
@@ -3,13 +3,17 @@ import mailbox
import pathlib
import pprint
import re
+from typing import List, Union, Tuple
-from . import import_or_install
+from . import import_or_install, imap_response
mailbox.Maildir.colon = '!'
FLAGS = re.compile(rb'FLAGS \(([^)]+?)\)')
+def clean_folder_name(folder_name: str, separator: str) -> str:
+ return folder_name.replace('.', '-').replace(separator, '.').replace('/', '-')
+
class Account:
def __init__(self, address: str, info: dict):
appdirs = import_or_install('appdirs')
@@ -18,21 +22,27 @@ class Account:
self.mailbox = mailbox.Maildir(data_dir / address)
self.info = info
+ def fetch_folders(self):
+ with imaplib.IMAP4_SSL(self.info['imap host']) as M:
+ M.login(self.address, self.info['password'])
+ folder_list: List[bytes]
+ typ, folder_list = M.list()
+ for folder in folder_list:
+ folder_info = imap_response.List.parse(folder)
+ self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter))
+
def fetch_inbox(self):
with imaplib.IMAP4_SSL(self.info['imap host']) as M:
M.login(self.address, self.info['password'])
- try:
- inbox = self.mailbox.get_folder('Inbox')
- except mailbox.NoSuchMailboxError:
- inbox = self.mailbox.add_folder('Inbox')
+ inbox = self.mailbox.add_folder('Inbox')
M.select()
typ, data = M.search(None, 'ALL')
for num in data[0].split():
typ, data = M.fetch(num, '(FLAGS RFC822)')
for prefix, message in data[:-1]:
flags = FLAGS.search(prefix).group(1).split()
- print(flags)
message = mailbox.MaildirMessage(message)
+ print(message['Subject'], flags)
if rb'\Seen' in flags:
message.add_flag('S')
message.set_subdir('cur')
@@ -45,5 +55,8 @@ class Account:
inbox.add(message)
M.close()
+ def folders(self) -> List[Tuple[str, mailbox.Maildir]]:
+ return [(folder, self.mailbox.get_folder(folder)) for folder in self.mailbox.list_folders()]
+
def inbox(self) -> mailbox.Maildir:
return self.mailbox.add_folder('Inbox')
diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py
new file mode 100644
index 0000000..3dd5e63
--- /dev/null
+++ b/ctec/parse_utils.py
@@ -0,0 +1,260 @@
+from typing import Callable, List, Optional, Tuple, TypeVar
+
+__all__ = [
+ 'ParseResult',
+ 'Parser',
+ 'alt',
+ 'tag',
+ 'take_till1',
+ 'take_while1',
+ 'take_n',
+ 'any_char',
+ 'all_consuming',
+ 'map_parser',
+ 'opt',
+ 'verify',
+ 'many0',
+ 'many1',
+ 'delimited',
+ 'pair',
+ 'preceded',
+ 'separated_pair',
+ 'separated_triple',
+ 'separated_many0',
+]
+
+T = TypeVar('T')
+T1 = TypeVar('T1')
+T2 = TypeVar('T2')
+T3 = TypeVar('T3')
+
+ParseResult = Optional[Tuple[T, str]]
+Parser = Callable[[str], ParseResult[T]]
+
+def alt(*parsers: Parser[T]) -> Parser[T]:
+ def parse(text: str) -> ParseResult[T]:
+ for parser in parsers[:-1]:
+ result = parser(text)
+ if result is not None:
+ return result
+ return parsers[-1](text)
+ return parse
+
+def tag(tag_text: str) -> Parser[None]:
+ def parse(text: str) -> ParseResult[None]:
+ if text.startswith(tag_text):
+ return None, text[len(tag_text):]
+ return None
+ return parse
+
+def take_while1(predicate: Callable[[str], bool]) -> Parser[str]:
+ def parse(text: str) -> ParseResult[str]:
+ if len(text) == 0 or not predicate(text[0]):
+ return None
+ for i in range(1, len(text)):
+ if not predicate(text[i]):
+ return text[:i], text[i:]
+ return text, ""
+ return parse
+
+def take_till1(predicate: Callable[[str], bool]) -> Parser[str]:
+ return take_while1(lambda x: not predicate(x))
+
+def take_n(n: int) -> Parser[str]:
+ def parse(text: str) -> ParseResult[str]:
+ if len(text) < n:
+ return None
+ return text[:n], text[n:]
+ return parse
+
+def any_char(text: str) -> ParseResult[str]:
+ if len(text) > 0:
+ return text[0], text[1:]
+ return None
+
+def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]:
+ def parse(text: str) -> ParseResult[T]:
+ parsed_result = parser(text)
+ if parsed_result is None:
+ if debug:
+ print('all_consuming: parser failed')
+ return None
+ result, extra = parsed_result
+ if len(extra) > 0:
+ if debug:
+ print('all_consuming: leftover text {}', repr(extra))
+ return None
+ return result, ''
+ return parse
+
+def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]:
+ def parse(text: str) -> ParseResult[T2]:
+ parsed_result = parser(text)
+ if parsed_result is None:
+ return None
+ result, extra = parsed_result
+ return mapper(result), extra
+ return parse
+
+def opt(parser: Parser[T]) -> Parser[Optional[T]]:
+ def parse(text: str) -> ParseResult[Optional[T]]:
+ result = parser(text)
+ if result is None:
+ return None, text
+ return result
+ return parse
+
+def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]:
+ def parse(text: str) -> ParseResult[T]:
+ parsed_result = parser(text)
+ if parsed_result is None:
+ return None
+ result, extra = parsed_result
+ if predicate(result):
+ return result, extra
+ return None
+ return parse
+
+def many0(parser: Parser[T]) -> Parser[List[T]]:
+ def parse(text: str) -> ParseResult[List[T]]:
+ result = []
+ parser_result = parser(text)
+ while parser_result is not None:
+ this_result, text = parser_result
+ result.append(this_result)
+ parser_result = parser(text)
+ return result, text
+ return parse
+
+def many1(parser: Parser[T]) -> Parser[List[T]]:
+ def parse(text: str) -> ParseResult[List[T]]:
+ parser_result = parser(text)
+ if parser_result is None:
+ return None
+ this_result, extra = parser_result
+ result = [this_result]
+
+ parser_result = parser(extra)
+ while parser_result is not None:
+ this_result, extra = parser_result
+ result.append(this_result)
+ parser_result = parser(extra)
+ return result, extra
+ return parse
+
+def separated_many0(parser: Parser[T], separator_parser: Parser) -> Parser[List[T]]:
+ def parse(text: str) -> ParseResult[List[T]]:
+ result = []
+ while True:
+ parser_result = parser(text)
+ if parser_result is None:
+ break
+ this_result, text = parser_result
+ result.append(this_result)
+
+ separator_result = separator_parser(text)
+ if separator_result is None:
+ break
+ _, text = separator_result
+ return result, text
+ return parse
+
+def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser[T2]) -> Parser[T]:
+ def parse(text: str) -> ParseResult[T]:
+ before_result = before_parser(text)
+ if before_result is None:
+ return None
+ _, extra = before_result
+
+ parsed_result = parser(extra)
+ if parsed_result is None:
+ return None
+ result, extra = parsed_result
+
+ after_result = after_parser(extra)
+ if after_result is None:
+ return None
+ _, extra = after_result
+
+ return result, extra
+ return parse
+
+def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:
+ def parse(text: str) -> ParseResult[Tuple[T1, T2]]:
+ first_parsed_result = first_parser(text)
+ if first_parsed_result is None:
+ return None
+ first_result, extra = first_parsed_result
+
+ second_parsed_result = second_parser(extra)
+ if second_parsed_result is None:
+ return None
+ second_result, extra = second_parsed_result
+
+ return (first_result, second_result), extra
+ return parse
+
+def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]:
+ def parse(text: str) -> ParseResult[T]:
+ before_result = before_parser(text)
+ if before_result is None:
+ return None
+ _, extra = before_result
+
+ parsed_result = parser(extra)
+ if parsed_result is None:
+ return None
+ result, extra = parsed_result
+
+ return result, extra
+ return parse
+
+def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:
+ def parse(text: str) -> ParseResult[Tuple[T1, T2]]:
+ first_parsed_result = first_parser(text)
+ if first_parsed_result is None:
+ return None
+ first_result, extra = first_parsed_result
+
+ between_result = between_parser(extra)
+ if between_result is None:
+ return None
+ _, extra = between_result
+
+ second_parsed_result = second_parser(extra)
+ if second_parsed_result is None:
+ return None
+ second_result, extra = second_parsed_result
+
+ return (first_result, second_result), extra
+ return parse
+
+def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_parser: Parser[T2], between23_parser: Parser, third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]:
+ def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]:
+ first_parsed_result = first_parser(text)
+ if first_parsed_result is None:
+ return None
+ first_result, extra = first_parsed_result
+
+ between_result = between12_parser(extra)
+ if between_result is None:
+ return None
+ _, extra = between_result
+
+ second_parsed_result = second_parser(extra)
+ if second_parsed_result is None:
+ return None
+ second_result, extra = second_parsed_result
+
+ between_result = between23_parser(extra)
+ if between_result is None:
+ return None
+ _, extra = between_result
+
+ third_parsed_result = third_parser(extra)
+ if third_parsed_result is None:
+ return None
+ third_result, extra = third_parsed_result
+
+ return (first_result, second_result, third_result), extra
+ return parse