diff options
| author | Melody Horn <melody@boringcactus.com> | 2021-04-30 21:53:31 -0600 | 
|---|---|---|
| committer | Melody Horn <melody@boringcactus.com> | 2021-04-30 21:53:31 -0600 | 
| commit | 2ef831feb8cd1b2393196877b7b7c93ac49093d7 (patch) | |
| tree | 704290327fc95c998544720914c5e01625d6d4d3 | |
| parent | bbf01ada6b2f748618db820624bf768989a11924 (diff) | |
| download | ctec-2ef831feb8cd1b2393196877b7b7c93ac49093d7.tar.gz ctec-2ef831feb8cd1b2393196877b7b7c93ac49093d7.zip  | |
overhaul parsing to match ABNF from spec
| -rw-r--r-- | ctec/imap_response.py | 92 | ||||
| -rw-r--r-- | ctec/logic.py | 33 | ||||
| -rw-r--r-- | ctec/parse_utils.py | 71 | 
3 files changed, 153 insertions, 43 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py index 23ca943..cad76c9 100644 --- a/ctec/imap_response.py +++ b/ctec/imap_response.py @@ -1,46 +1,92 @@  # this should really be in the stdlib imo  from dataclasses import dataclass -from typing import List as ListT, Union, ClassVar +from typing import List as ListT, Tuple, Optional -from .parse_utils import ParseResult, Parser, take_while1, tag, delimited, take_n, alt, map_parser, separated_many0, separated_triple, all_consuming +from .parse_utils import ( +    Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify, +    preceded, and_then, itag, take_while0, as_predicate +)  __all__ = [      'List',  ] -atom: Parser[str] = take_while1(lambda c: c.isalnum() or c in r'\/') +# level 0 +ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) -number: Parser[int] = map_parser(take_while1(str.isnumeric), int) +digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') -def literal_string(text: str) -> ParseResult[str]: -    delimited_result = delimited(tag('{'), number, tag('}\r\n'))(text) -    if delimited_result is None: -        return None -    count, text = delimited_result -    return take_n(count)(text) +dquote: Parser[str] = tag('"') -quoted_string: Parser[str] = delimited(tag('"'), take_while1(lambda c: c not in '\r\n"'), tag('"')) +list_wildcards: Parser[str] = alt(tag('%'), tag('*')) -string: Parser[str] = alt(literal_string, quoted_string) +nil: Parser[str] = itag('NIL') -astring: Parser[str] = alt(atom, string) +resp_specials: Parser[str] = tag(']') -data_item: Parser[Union[int, str]] = alt(number, atom, string) +text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') -ParensList = ListT[Union[int, str, 'ParensList']] -def parens_list(text: str) -> ParseResult[ParensList]: -    return delimited(tag('('), separated_many0(alt(data_item, parens_list), tag(' ')), tag(')'))(text) +# level 1 +number: Parser[str] = take_while1(as_predicate(digit)) + +quoted_specials: Parser[str] = alt(dquote, tag('\\')) + +# level 2 +atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) + +literal: Parser[str] = and_then( +    delimited(tag('{'), number, tag('}\r\n')), +    lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) +) + +quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) + +# level 3 +atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) + +quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) + +# level 4 +astring_char: Parser[str] = alt(atom_char, resp_specials) + +atom: Parser[str] = take_while1(as_predicate(atom_char)) + +string: Parser[str] = alt(quoted, literal) + +# level 5 +astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) + +flag_extension: Parser[str] = preceded(tag('\\'), atom) + +# level 6 +mailbox: Parser[str] = alt(itag('INBOX'), astring) + +mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) + +# level 7 +mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) + +# level 8 +mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( +    delimited(tag('('), mbx_list_flags, tag(')')), +    tag(' '), +    alt(delimited(dquote, quoted_char, dquote), nil), +    tag(' '), +    mailbox, +)  @dataclass  class List:      attributes: ListT[str] -    delimiter: str +    delimiter: Optional[str]      name: str      @staticmethod -    def parse(response: bytes) -> 'List': -        response = response.decode('ASCII') -        print(response) -        parser = all_consuming(separated_triple(parens_list, tag(' '), string, tag(' '), astring), debug=True) -        (attributes, delimiter, name), _ = parser(response) +    def parse(response_bytes: bytes) -> 'List': +        response = response_bytes.decode('ASCII') +        parser = all_consuming(mailbox_list, debug=True) +        parse_result = parser(response) +        if parse_result is None: +            raise ValueError('invalid List.parse argument {}', repr(response)) +        (attributes, delimiter, name), _ = parse_result          return List(attributes, delimiter, name) diff --git a/ctec/logic.py b/ctec/logic.py index 57b553c..1769e47 100644 --- a/ctec/logic.py +++ b/ctec/logic.py @@ -11,8 +11,20 @@ mailbox.Maildir.colon = '!'  FLAGS = re.compile(rb'FLAGS \(([^)]+?)\)') +def percent_encode(c: str) -> str: +    return '%' + hex(ord(c))[2:] +  def clean_folder_name(folder_name: str, separator: str) -> str: -    return folder_name.replace('.', '-').replace(separator, '.').replace('/', '-') +    folder_name = folder_name.replace('%', percent_encode('%')) +    folder_name = folder_name.replace('.', percent_encode('.')) +    folder_name = folder_name.replace(separator, '.') +    return folder_name + +def dirty_folder_name(folder_name: str, separator: str = '/') -> str: +    folder_name = folder_name.replace('.', separator) +    folder_name = folder_name.replace(percent_encode('.'), '.') +    folder_name = folder_name.replace(percent_encode('%'), '%') +    return folder_name  class Account:      def __init__(self, address: str, info: dict): @@ -21,15 +33,20 @@ class Account:          self.address = address          self.mailbox = mailbox.Maildir(data_dir / address)          self.info = info +        self.connection = imaplib.IMAP4_SSL(self.info['imap host']) +        self.connection.login(self.address, self.info['password']) + +    def __del__(self): +        self.connection.logout()      def fetch_folders(self): -        with imaplib.IMAP4_SSL(self.info['imap host']) as M: -            M.login(self.address, self.info['password']) -            folder_list: List[bytes] -            typ, folder_list = M.list() -            for folder in folder_list: -                folder_info = imap_response.List.parse(folder) -                self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) +        folder_list: List[bytes] +        typ, folder_list = self.connection.list() +        for folder in folder_list: +            folder_info = imap_response.List.parse(folder) +            if folder_info.delimiter != '/': +                raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter') +            self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter))      def fetch_inbox(self):          with imaplib.IMAP4_SSL(self.info['imap host']) as M: diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py index 3dd5e63..fe3ed06 100644 --- a/ctec/parse_utils.py +++ b/ctec/parse_utils.py @@ -3,21 +3,27 @@ from typing import Callable, List, Optional, Tuple, TypeVar  __all__ = [      'ParseResult',      'Parser', +    'as_predicate',      'alt',      'tag', +    'itag',      'take_till1', +    'take_while0',      'take_while1',      'take_n',      'any_char',      'all_consuming',      'map_parser', +    'and_then',      'opt',      'verify',      'many0',      'many1',      'delimited',      'pair', +    'triple',      'preceded', +    'followed',      'separated_pair',      'separated_triple',      'separated_many0', @@ -31,6 +37,11 @@ T3 = TypeVar('T3')  ParseResult = Optional[Tuple[T, str]]  Parser = Callable[[str], ParseResult[T]] +def as_predicate(parser: Parser[T]) -> Callable[[str], bool]: +    def check(text: str) -> bool: +        return parser(text) is not None +    return check +  def alt(*parsers: Parser[T]) -> Parser[T]:      def parse(text: str) -> ParseResult[T]:          for parser in parsers[:-1]: @@ -40,13 +51,29 @@ def alt(*parsers: Parser[T]) -> Parser[T]:          return parsers[-1](text)      return parse -def tag(tag_text: str) -> Parser[None]: -    def parse(text: str) -> ParseResult[None]: +def tag(tag_text: str) -> Parser[str]: +    def parse(text: str) -> ParseResult[str]:          if text.startswith(tag_text): -            return None, text[len(tag_text):] +            return tag_text, text[len(tag_text):] +        return None +    return parse + +# case-insensitive tag +def itag(tag_text: str) -> Parser[str]: +    def parse(text: str) -> ParseResult[str]: +        if text.casefold().startswith(tag_text.casefold()): +            return tag_text, text[len(tag_text):]          return None      return parse +def take_while0(predicate: Callable[[str], bool]) -> Parser[str]: +    def parse(text: str) -> ParseResult[str]: +        for i in range(len(text)): +            if not predicate(text[i]): +                return text[:i], text[i:] +        return text, "" +    return parse +  def take_while1(predicate: Callable[[str], bool]) -> Parser[str]:      def parse(text: str) -> ParseResult[str]:          if len(text) == 0 or not predicate(text[0]): @@ -96,6 +123,15 @@ def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]:          return mapper(result), extra      return parse +def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[T2]]) -> Parser[T2]: +    def parse(text: str) -> ParseResult[T2]: +        parsed_result = first_parser(text) +        if parsed_result is None: +            return None +        result, _ = parsed_result +        return get_second_parser(result)(text) +    return parse +  def opt(parser: Parser[T]) -> Parser[Optional[T]]:      def parse(text: str) -> ParseResult[Optional[T]]:          result = parser(text) @@ -194,21 +230,32 @@ def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1          return (first_result, second_result), extra      return parse -def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]: -    def parse(text: str) -> ParseResult[T]: -        before_result = before_parser(text) -        if before_result is None: +def triple(first_parser: Parser[T1], second_parser: Parser[T2], third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]: +    def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]: +        first_parsed_result = first_parser(text) +        if first_parsed_result is None:              return None -        _, extra = before_result +        first_result, extra = first_parsed_result -        parsed_result = parser(extra) -        if parsed_result is None: +        second_parsed_result = second_parser(extra) +        if second_parsed_result is None:              return None -        result, extra = parsed_result +        second_result, extra = second_parsed_result -        return result, extra +        third_parsed_result = third_parser(extra) +        if third_parsed_result is None: +            return None +        third_result, extra = third_parsed_result + +        return (first_result, second_result, third_result), extra      return parse +def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]: +    return map_parser(pair(before_parser, parser), lambda x: x[1]) + +def followed(parser: Parser[T], after_parser: Parser[T1]) -> Parser[T]: +    return map_parser(pair(parser, after_parser), lambda x: x[0]) +  def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:      def parse(text: str) -> ParseResult[Tuple[T1, T2]]:          first_parsed_result = first_parser(text)  |