aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMelody Horn <melody@boringcactus.com>2021-04-30 21:53:31 -0600
committerMelody Horn <melody@boringcactus.com>2021-04-30 21:53:31 -0600
commit2ef831feb8cd1b2393196877b7b7c93ac49093d7 (patch)
tree704290327fc95c998544720914c5e01625d6d4d3
parentbbf01ada6b2f748618db820624bf768989a11924 (diff)
downloadctec-2ef831feb8cd1b2393196877b7b7c93ac49093d7.tar.gz
ctec-2ef831feb8cd1b2393196877b7b7c93ac49093d7.zip
overhaul parsing to match ABNF from spec
-rw-r--r--ctec/imap_response.py92
-rw-r--r--ctec/logic.py33
-rw-r--r--ctec/parse_utils.py71
3 files changed, 153 insertions, 43 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py
index 23ca943..cad76c9 100644
--- a/ctec/imap_response.py
+++ b/ctec/imap_response.py
@@ -1,46 +1,92 @@
# this should really be in the stdlib imo
from dataclasses import dataclass
-from typing import List as ListT, Union, ClassVar
+from typing import List as ListT, Tuple, Optional
-from .parse_utils import ParseResult, Parser, take_while1, tag, delimited, take_n, alt, map_parser, separated_many0, separated_triple, all_consuming
+from .parse_utils import (
+ Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify,
+ preceded, and_then, itag, take_while0, as_predicate
+)
__all__ = [
'List',
]
-atom: Parser[str] = take_while1(lambda c: c.isalnum() or c in r'\/')
+# level 0
+ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable())
-number: Parser[int] = map_parser(take_while1(str.isnumeric), int)
+digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789')
-def literal_string(text: str) -> ParseResult[str]:
- delimited_result = delimited(tag('{'), number, tag('}\r\n'))(text)
- if delimited_result is None:
- return None
- count, text = delimited_result
- return take_n(count)(text)
+dquote: Parser[str] = tag('"')
-quoted_string: Parser[str] = delimited(tag('"'), take_while1(lambda c: c not in '\r\n"'), tag('"'))
+list_wildcards: Parser[str] = alt(tag('%'), tag('*'))
-string: Parser[str] = alt(literal_string, quoted_string)
+nil: Parser[str] = itag('NIL')
-astring: Parser[str] = alt(atom, string)
+resp_specials: Parser[str] = tag(']')
-data_item: Parser[Union[int, str]] = alt(number, atom, string)
+text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n')
-ParensList = ListT[Union[int, str, 'ParensList']]
-def parens_list(text: str) -> ParseResult[ParensList]:
- return delimited(tag('('), separated_many0(alt(data_item, parens_list), tag(' ')), tag(')'))(text)
+# level 1
+number: Parser[str] = take_while1(as_predicate(digit))
+
+quoted_specials: Parser[str] = alt(dquote, tag('\\'))
+
+# level 2
+atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
+
+literal: Parser[str] = and_then(
+ delimited(tag('{'), number, tag('}\r\n')),
+ lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n)))
+)
+
+quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials))
+
+# level 3
+atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None)
+
+quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote)
+
+# level 4
+astring_char: Parser[str] = alt(atom_char, resp_specials)
+
+atom: Parser[str] = take_while1(as_predicate(atom_char))
+
+string: Parser[str] = alt(quoted, literal)
+
+# level 5
+astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string)
+
+flag_extension: Parser[str] = preceded(tag('\\'), atom)
+
+# level 6
+mailbox: Parser[str] = alt(itag('INBOX'), astring)
+
+mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension)
+
+# level 7
+mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' '))
+
+# level 8
+mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple(
+ delimited(tag('('), mbx_list_flags, tag(')')),
+ tag(' '),
+ alt(delimited(dquote, quoted_char, dquote), nil),
+ tag(' '),
+ mailbox,
+)
@dataclass
class List:
attributes: ListT[str]
- delimiter: str
+ delimiter: Optional[str]
name: str
@staticmethod
- def parse(response: bytes) -> 'List':
- response = response.decode('ASCII')
- print(response)
- parser = all_consuming(separated_triple(parens_list, tag(' '), string, tag(' '), astring), debug=True)
- (attributes, delimiter, name), _ = parser(response)
+ def parse(response_bytes: bytes) -> 'List':
+ response = response_bytes.decode('ASCII')
+ parser = all_consuming(mailbox_list, debug=True)
+ parse_result = parser(response)
+ if parse_result is None:
+ raise ValueError('invalid List.parse argument {}', repr(response))
+ (attributes, delimiter, name), _ = parse_result
return List(attributes, delimiter, name)
diff --git a/ctec/logic.py b/ctec/logic.py
index 57b553c..1769e47 100644
--- a/ctec/logic.py
+++ b/ctec/logic.py
@@ -11,8 +11,20 @@ mailbox.Maildir.colon = '!'
FLAGS = re.compile(rb'FLAGS \(([^)]+?)\)')
+def percent_encode(c: str) -> str:
+ return '%' + hex(ord(c))[2:]
+
def clean_folder_name(folder_name: str, separator: str) -> str:
- return folder_name.replace('.', '-').replace(separator, '.').replace('/', '-')
+ folder_name = folder_name.replace('%', percent_encode('%'))
+ folder_name = folder_name.replace('.', percent_encode('.'))
+ folder_name = folder_name.replace(separator, '.')
+ return folder_name
+
+def dirty_folder_name(folder_name: str, separator: str = '/') -> str:
+ folder_name = folder_name.replace('.', separator)
+ folder_name = folder_name.replace(percent_encode('.'), '.')
+ folder_name = folder_name.replace(percent_encode('%'), '%')
+ return folder_name
class Account:
def __init__(self, address: str, info: dict):
@@ -21,15 +33,20 @@ class Account:
self.address = address
self.mailbox = mailbox.Maildir(data_dir / address)
self.info = info
+ self.connection = imaplib.IMAP4_SSL(self.info['imap host'])
+ self.connection.login(self.address, self.info['password'])
+
+ def __del__(self):
+ self.connection.logout()
def fetch_folders(self):
- with imaplib.IMAP4_SSL(self.info['imap host']) as M:
- M.login(self.address, self.info['password'])
- folder_list: List[bytes]
- typ, folder_list = M.list()
- for folder in folder_list:
- folder_info = imap_response.List.parse(folder)
- self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter))
+ folder_list: List[bytes]
+ typ, folder_list = self.connection.list()
+ for folder in folder_list:
+ folder_info = imap_response.List.parse(folder)
+ if folder_info.delimiter != '/':
+ raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter')
+ self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter))
def fetch_inbox(self):
with imaplib.IMAP4_SSL(self.info['imap host']) as M:
diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py
index 3dd5e63..fe3ed06 100644
--- a/ctec/parse_utils.py
+++ b/ctec/parse_utils.py
@@ -3,21 +3,27 @@ from typing import Callable, List, Optional, Tuple, TypeVar
__all__ = [
'ParseResult',
'Parser',
+ 'as_predicate',
'alt',
'tag',
+ 'itag',
'take_till1',
+ 'take_while0',
'take_while1',
'take_n',
'any_char',
'all_consuming',
'map_parser',
+ 'and_then',
'opt',
'verify',
'many0',
'many1',
'delimited',
'pair',
+ 'triple',
'preceded',
+ 'followed',
'separated_pair',
'separated_triple',
'separated_many0',
@@ -31,6 +37,11 @@ T3 = TypeVar('T3')
ParseResult = Optional[Tuple[T, str]]
Parser = Callable[[str], ParseResult[T]]
+def as_predicate(parser: Parser[T]) -> Callable[[str], bool]:
+ def check(text: str) -> bool:
+ return parser(text) is not None
+ return check
+
def alt(*parsers: Parser[T]) -> Parser[T]:
def parse(text: str) -> ParseResult[T]:
for parser in parsers[:-1]:
@@ -40,13 +51,29 @@ def alt(*parsers: Parser[T]) -> Parser[T]:
return parsers[-1](text)
return parse
-def tag(tag_text: str) -> Parser[None]:
- def parse(text: str) -> ParseResult[None]:
+def tag(tag_text: str) -> Parser[str]:
+ def parse(text: str) -> ParseResult[str]:
if text.startswith(tag_text):
- return None, text[len(tag_text):]
+ return tag_text, text[len(tag_text):]
+ return None
+ return parse
+
+# case-insensitive tag
+def itag(tag_text: str) -> Parser[str]:
+ def parse(text: str) -> ParseResult[str]:
+ if text.casefold().startswith(tag_text.casefold()):
+ return tag_text, text[len(tag_text):]
return None
return parse
+def take_while0(predicate: Callable[[str], bool]) -> Parser[str]:
+ def parse(text: str) -> ParseResult[str]:
+ for i in range(len(text)):
+ if not predicate(text[i]):
+ return text[:i], text[i:]
+ return text, ""
+ return parse
+
def take_while1(predicate: Callable[[str], bool]) -> Parser[str]:
def parse(text: str) -> ParseResult[str]:
if len(text) == 0 or not predicate(text[0]):
@@ -96,6 +123,15 @@ def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]:
return mapper(result), extra
return parse
+def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[T2]]) -> Parser[T2]:
+ def parse(text: str) -> ParseResult[T2]:
+ parsed_result = first_parser(text)
+ if parsed_result is None:
+ return None
+ result, _ = parsed_result
+ return get_second_parser(result)(text)
+ return parse
+
def opt(parser: Parser[T]) -> Parser[Optional[T]]:
def parse(text: str) -> ParseResult[Optional[T]]:
result = parser(text)
@@ -194,21 +230,32 @@ def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1
return (first_result, second_result), extra
return parse
-def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]:
- def parse(text: str) -> ParseResult[T]:
- before_result = before_parser(text)
- if before_result is None:
+def triple(first_parser: Parser[T1], second_parser: Parser[T2], third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]:
+ def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]:
+ first_parsed_result = first_parser(text)
+ if first_parsed_result is None:
return None
- _, extra = before_result
+ first_result, extra = first_parsed_result
- parsed_result = parser(extra)
- if parsed_result is None:
+ second_parsed_result = second_parser(extra)
+ if second_parsed_result is None:
return None
- result, extra = parsed_result
+ second_result, extra = second_parsed_result
- return result, extra
+ third_parsed_result = third_parser(extra)
+ if third_parsed_result is None:
+ return None
+ third_result, extra = third_parsed_result
+
+ return (first_result, second_result, third_result), extra
return parse
+def preceded(before_parser: Parser[T1], parser: Parser[T]) -> Parser[T]:
+ return map_parser(pair(before_parser, parser), lambda x: x[1])
+
+def followed(parser: Parser[T], after_parser: Parser[T1]) -> Parser[T]:
+ return map_parser(pair(parser, after_parser), lambda x: x[0])
+
def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:
def parse(text: str) -> ParseResult[Tuple[T1, T2]]:
first_parsed_result = first_parser(text)