From 2ef831feb8cd1b2393196877b7b7c93ac49093d7 Mon Sep 17 00:00:00 2001 From: Melody Horn Date: Fri, 30 Apr 2021 21:53:31 -0600 Subject: overhaul parsing to match ABNF from spec --- ctec/imap_response.py | 92 ++++++++++++++++++++++++++++++++++++++------------- ctec/logic.py | 33 +++++++++++++----- ctec/parse_utils.py | 71 ++++++++++++++++++++++++++++++++------- 3 files changed, 153 insertions(+), 43 deletions(-) diff --git a/ctec/imap_response.py b/ctec/imap_response.py index 23ca943..cad76c9 100644 --- a/ctec/imap_response.py +++ b/ctec/imap_response.py @@ -1,46 +1,92 @@ # this should really be in the stdlib imo from dataclasses import dataclass -from typing import List as ListT, Union, ClassVar +from typing import List as ListT, Tuple, Optional -from .parse_utils import ParseResult, Parser, take_while1, tag, delimited, take_n, alt, map_parser, separated_many0, separated_triple, all_consuming +from .parse_utils import ( + Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify, + preceded, and_then, itag, take_while0, as_predicate +) __all__ = [ 'List', ] -atom: Parser[str] = take_while1(lambda c: c.isalnum() or c in r'\/') +# level 0 +ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) -number: Parser[int] = map_parser(take_while1(str.isnumeric), int) +digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') -def literal_string(text: str) -> ParseResult[str]: - delimited_result = delimited(tag('{'), number, tag('}\r\n'))(text) - if delimited_result is None: - return None - count, text = delimited_result - return take_n(count)(text) +dquote: Parser[str] = tag('"') -quoted_string: Parser[str] = delimited(tag('"'), take_while1(lambda c: c not in '\r\n"'), tag('"')) +list_wildcards: Parser[str] = alt(tag('%'), tag('*')) -string: Parser[str] = alt(literal_string, quoted_string) +nil: Parser[str] = itag('NIL') -astring: Parser[str] = alt(atom, string) +resp_specials: Parser[str] = tag(']') -data_item: Parser[Union[int, str]] = alt(number, atom, string) +text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') -ParensList = ListT[Union[int, str, 'ParensList']] -def parens_list(text: str) -> ParseResult[ParensList]: - return delimited(tag('('), separated_many0(alt(data_item, parens_list), tag(' ')), tag(')'))(text) +# level 1 +number: Parser[str] = take_while1(as_predicate(digit)) + +quoted_specials: Parser[str] = alt(dquote, tag('\\')) + +# level 2 +atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) + +literal: Parser[str] = and_then( + delimited(tag('{'), number, tag('}\r\n')), + lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) +) + +quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) + +# level 3 +atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) + +quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) + +# level 4 +astring_char: Parser[str] = alt(atom_char, resp_specials) + +atom: Parser[str] = take_while1(as_predicate(atom_char)) + +string: Parser[str] = alt(quoted, literal) + +# level 5 +astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) + +flag_extension: Parser[str] = preceded(tag('\\'), atom) + +# level 6 +mailbox: Parser[str] = alt(itag('INBOX'), astring) + +mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) + +# level 7 +mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) + +# level 8 +mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( + delimited(tag('('), mbx_list_flags, tag(')')), + tag(' '), + alt(delimited(dquote, quoted_char, dquote), nil), + tag(' '), + mailbox, +) @dataclass class List: attributes: ListT[str] - delimiter: str + delimiter: Optional[str] name: str @staticmethod - def parse(response: bytes) -> 'List': - response = response.decode('ASCII') - print(response) - parser = all_consuming(separated_triple(parens_list, tag(' '), string, tag(' '), astring), debug=True) - (attributes, delimiter, name), _ = parser(response) + def parse(response_bytes: bytes) -> 'List': + response = response_bytes.decode('ASCII') + parser = all_consuming(mailbox_list, debug=True) + parse_result = parser(response) + if parse_result is None: + raise ValueError('invalid List.parse argument {}', repr(response)) + (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) diff --git a/ctec/logic.py b/ctec/logic.py index 57b553c..1769e47 100644 --- a/ctec/logic.py +++ b/ctec/logic.py @@ -11,8 +11,20 @@ mailbox.Maildir.colon = '!' FLAGS = re.compile(rb'FLAGS \(([^)]+?)\)') +def percent_encode(c: str) -> str: + return '%' + hex(ord(c))[2:] + def clean_folder_name(folder_name: str, separator: str) -> str: - return folder_name.replace('.', '-').replace(separator, '.').replace('/', '-') + folder_name = folder_name.replace('%', percent_encode('%')) + folder_name = folder_name.replace('.', percent_encode('.')) + folder_name = folder_name.replace(separator, '.') + return folder_name + +def dirty_folder_name(folder_name: str, separator: str = '/') -> str: + folder_name = folder_name.replace('.', separator) + folder_name = folder_name.replace(percent_encode('.'), '.') + folder_name = folder_name.replace(percent_encode('%'), '%') + return folder_name class Account: def __init__(self, address: str, info: dict): @@ -21,15 +33,20 @@ class Account: self.address = address self.mailbox = mailbox.Maildir(data_dir / address) self.info = info + self.connection = imaplib.IMAP4_SSL(self.info['imap host']) + self.connection.login(self.address, self.info['password']) + + def __del__(self): + self.connection.logout() def fetch_folders(self): - with imaplib.IMAP4_SSL(self.info['imap host']) as M: - M.login(self.address, self.info['password']) - folder_list: List[bytes] - typ, folder_list = M.list() - for folder in folder_list: - folder_info = imap_response.List.parse(folder) - self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) + folder_list: List[bytes] + typ, folder_list = self.connection.list() + for folder in folder_list: + folder_info = imap_response.List.parse(folder) + if folder_info.delimiter != '/': + raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter') + self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) def fetch_inbox(self): with imaplib.IMAP4_SSL(self.info['imap host']) as M: diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py index 3dd5e63..fe3ed06 100644 --- a/ctec/parse_utils.py +++ b/ctec/parse_utils.py @@ -3,21 +3,27 @@ from typing import Callable, List, Optional, Tuple, TypeVar __all__ = [ 'ParseResult', 'Parser', + 'as_predicate', 'alt', 'tag', + 'itag', 'take_till1', + 'take_while0', 'take_while1', 'take_n', 'any_char', 'all_consuming', 'map_parser', + 'and_then', 'opt', 'verify', 'many0', 'many1', 'delimited', 'pair', + 'triple', 'preceded', + 'followed', 'separated_pair', 'separated_triple', 'separated_many0', @@ -31,6 +37,11 @@ T3 = TypeVar('T3') ParseResult = Optional[Tuple[T, str]] Parser = Callable[[str], ParseResult[T]] +def as_predicate(parser: Parser[T]) -> Callable[[str], bool]: + def check(text: str) -> bool: + return parser(text) is not None + return check + def alt(*parsers: Parser[T]) -> Parser[T]: def parse(text: str) -> ParseResult[T]: for parser in parsers[:-1]: @@ -40,13 +51,29 @@ def alt(*parsers: Parser[T]) -> Parser[T]: return parsers[-1](text) return parse -def tag(tag_text: str) -> Parser[None]: - def parse(text: str) -> ParseResult[None]: +def tag(tag_text: str) -> Parser[str]: + def parse(text: str) -> ParseResult[str]: if text.startswith(tag_text): - return None, text[len(tag_text):] + return tag_text, text[len(tag_text):] + return None + return parse + +# case-insensitive tag +def itag(tag_text: str) -> Parser[str]: + def parse(text: str) -> ParseResult[str]: + if text.casefold().startswith(tag_text.casefold()): + return tag_text, text[len(tag_text):] return None return parse +def take_while0(predicate: Callable[[str], bool]) -> Parser[str]: + def parse(text: str) -> ParseResult[str]: + for i in range(len(text)): + if not predicate(text[i]): + return text[:i], text[i:] + return text, "" + return parse + def take_while1(predicate: Callable[[str], bool]) -> Parser[str]: def parse(text: str) -> ParseResult[str]: if len(text) == 0 or not predicate(text[0]): @@ -96,6 +123,15 @@ def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]: return mapper(result), extra return parse +def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[T2]]) -> Parser[T2]: + def parse(text: str) -> ParseResult[T2]: + parsed_result = first_parser(text) + if parsed_result is None: + return None + result, _ = parsed_result + return get_second_parser(result)(text) + return parse + def opt(parser: Parser[T]) -> Parser[Optional[T]]: def parse(text: str) -> ParseResult[Optional[T]]: result = parser(text) @@ -194,21 +230,32 @@ def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1 return (first_result, second_result), extra return parse -def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]: - def parse(text: str) -> ParseResult[T]: - before_result = before_parser(text) - if before_result is None: +def triple(first_parser: Parser[T1], second_parser: Parser[T2], third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]: + def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]: + first_parsed_result = first_parser(text) + if first_parsed_result is None: return None - _, extra = before_result + first_result, extra = first_parsed_result - parsed_result = parser(extra) - if parsed_result is None: + second_parsed_result = second_parser(extra) + if second_parsed_result is None: return None - result, extra = parsed_result + second_result, extra = second_parsed_result - return result, extra + third_parsed_result = third_parser(extra) + if third_parsed_result is None: + return None + third_result, extra = third_parsed_result + + return (first_result, second_result, third_result), extra return parse +def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]: + return map_parser(pair(before_parser, parser), lambda x: x[1]) + +def followed(parser: Parser[T], after_parser: Parser[T1]) -> Parser[T]: + return map_parser(pair(parser, after_parser), lambda x: x[0]) + def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]: def parse(text: str) -> ParseResult[Tuple[T1, T2]]: first_parsed_result = first_parser(text) -- cgit v1.2.3