diff options
author | Melody Horn <melody@boringcactus.com> | 2021-05-01 04:13:08 -0600 |
---|---|---|
committer | Melody Horn <melody@boringcactus.com> | 2021-05-01 04:13:08 -0600 |
commit | e824ee1564806359b580cb7f9975dc8f0aaa2e73 (patch) | |
tree | ff0c76a300c976a1520779fa180ecf5f573a393e /ctec/imap_response.py | |
parent | 2ef831feb8cd1b2393196877b7b7c93ac49093d7 (diff) | |
download | ctec-e824ee1564806359b580cb7f9975dc8f0aaa2e73.tar.gz ctec-e824ee1564806359b580cb7f9975dc8f0aaa2e73.zip |
fetch message contents
this still has some duplicates and breaks on MIME Content-Transfer-Encoding: 8bit but like. that's life
Diffstat (limited to 'ctec/imap_response.py')
-rw-r--r-- | ctec/imap_response.py | 254 |
1 files changed, 248 insertions, 6 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py index cad76c9..10d6a54 100644 --- a/ctec/imap_response.py +++ b/ctec/imap_response.py @@ -1,39 +1,77 @@ # this should really be in the stdlib imo from dataclasses import dataclass -from typing import List as ListT, Tuple, Optional +from typing import List as ListT, Tuple, Optional, Union, Dict, Callable, Sequence from .parse_utils import ( Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify, - preceded, and_then, itag, take_while0, as_predicate + preceded, and_then, itag, take_while0, as_predicate, separated_pair, pair, separated_many1, opt, map_parser, + many_m_n, string_concat, many1, triple ) __all__ = [ 'List', + 'MessageData', ] +# common utility functions, manually typed so mypy doesn't get confused +def _condense_non_none(data: Sequence[Optional[str]]) -> str: + return ''.join(x for x in data if x is not None) + # level 0 ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') +digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789') + dquote: Parser[str] = tag('"') list_wildcards: Parser[str] = alt(tag('%'), tag('*')) -nil: Parser[str] = itag('NIL') +nil: Parser[None] = map_parser(itag('NIL'), lambda _: None) resp_specials: Parser[str] = tag(']') text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') # level 1 +date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2))) + +date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) + +date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4)) + number: Parser[str] = take_while1(as_predicate(digit)) +nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join) + quoted_specials: Parser[str] = alt(dquote, tag('\\')) +time: Parser[str] = map_parser(separated_triple( + string_concat(many_m_n(digit, 2, 2)), + tag(':'), + string_concat(many_m_n(digit, 2, 2)), + tag(':'), + string_concat(many_m_n(digit, 2, 2)) +), ':'.join) + +zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4)))) + # level 2 atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) +date_time: Parser[str] = delimited( + dquote, + map_parser(separated_triple( + map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join), + tag(' '), + time, + tag(' '), + zone + ), ' '.join), + dquote +) + literal: Parser[str] = and_then( delimited(tag('{'), number, tag('}\r\n')), lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) @@ -41,6 +79,8 @@ literal: Parser[str] = and_then( quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) +section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join) + # level 3 atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) @@ -56,17 +96,60 @@ string: Parser[str] = alt(quoted, literal) # level 5 astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) -flag_extension: Parser[str] = preceded(tag('\\'), atom) +flag_keyword: Parser[str] = atom + +flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join) + +nstring: Parser[Optional[str]] = alt(string, nil) # level 6 +addr_adl: Parser[Optional[str]] = nstring +addr_host: Parser[Optional[str]] = nstring +addr_mailbox: Parser[Optional[str]] = nstring +addr_name: Parser[Optional[str]] = nstring + +env_date: Parser[Optional[str]] = nstring +env_in_reply_to: Parser[Optional[str]] = nstring +env_message_id: Parser[Optional[str]] = nstring +env_subject: Parser[Optional[str]] = nstring + +flag: Parser[str] = alt( + *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']), + flag_keyword, + flag_extension +) + +header_fld_name: Parser[str] = astring + mailbox: Parser[str] = alt(itag('INBOX'), astring) mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) # level 7 +_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] +def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress: + return (*t[0], *t[1]) +address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair( + separated_pair(addr_name, tag(' '), addr_adl), + tag(' '), + separated_pair(addr_mailbox, tag(' '), addr_host) +), tag(')')), _merge_for_address) + +flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag) + +header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')')) + mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) # level 8 +_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list())) +env_bcc: Parser[ListT[_ParsedAddress]] = _address_list +env_cc: Parser[ListT[_ParsedAddress]] = _address_list +env_from: Parser[ListT[_ParsedAddress]] = _address_list +env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list +env_sender: Parser[ListT[_ParsedAddress]] = _address_list +env_to: Parser[ListT[_ParsedAddress]] = _address_list + mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( delimited(tag('('), mbx_list_flags, tag(')')), tag(' '), @@ -75,6 +158,123 @@ mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( mailbox, ) +msg_att_dynamic: Parser[ListT[str]] = delimited( + itag('FLAGS ('), + separated_many0(flag_fetch, tag(' ')), + tag(')') +) + +section_msgtext: Parser[str] = alt( + string_concat(separated_pair( + map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none), + tag(' '), + string_concat(header_list) + )), + itag('HEADER'), + itag('TEXT') +) + +# level 9 +_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]] +def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]: + def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]: + return name, x + return give_label +def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]: + def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]: + return name, x + return give_label +def _flatten_envelope(data: Tuple) -> _Envelope: + def do_flatten(x: Tuple): + if isinstance(x[0], str): + yield x + else: + for datum in x: + yield from do_flatten(datum) + return dict(do_flatten(data)) +envelope: Parser[_Envelope] = map_parser(delimited( + tag('('), + separated_triple( + separated_triple( + map_parser(env_date, _label_nstring('date')), + tag(' '), + map_parser(env_subject, _label_nstring('subject')), + tag(' '), + map_parser(env_from, _label_address_list('from')) + ), + tag(' '), + separated_triple( + map_parser(env_sender, _label_address_list('sender')), + tag(' '), + map_parser(env_reply_to, _label_address_list('reply_to')), + tag(' '), + map_parser(env_to, _label_address_list('to')) + ), + tag(' '), + separated_pair( + separated_pair( + map_parser(env_cc, _label_address_list('cc')), + tag(' '), + map_parser(env_bcc, _label_address_list('bcc')) + ), + tag(' '), + separated_pair( + map_parser(env_in_reply_to, _label_nstring('in_reply_to')), + tag(' '), + map_parser(env_message_id, _label_nstring('message_id')) + ) + ) + ), + tag(')') +), _flatten_envelope) + +section_text: Parser[str] = alt(section_msgtext, itag('MIME')) + +# level 10 +section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none)) + +# level 11 +section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none) + +# level 12 +msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt( + separated_pair(itag('ENVELOPE'), tag(' '), envelope), + separated_pair(itag('INTERNALDATE'), tag(' '), date_time), + separated_pair( + map_parser( + pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))), + _condense_non_none + ), + tag(' '), + nstring + ), + separated_pair(itag('RFC822.SIZE'), tag(' '), number), + # TODO BODY + separated_pair( + map_parser( + triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))), + _condense_non_none + ), + tag(' '), + nstring + ) + # TODO UID +) + +# level 13 +msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited( + tag('('), + separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')), + tag(')') +) + +# level 14 +message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair( + map_parser(nz_number, int), + tag(' '), # TODO what does imaplib do with expunge replies + msg_att +) + @dataclass class List: attributes: ListT[str] @@ -84,9 +284,51 @@ class List: @staticmethod def parse(response_bytes: bytes) -> 'List': response = response_bytes.decode('ASCII') - parser = all_consuming(mailbox_list, debug=True) + parser = all_consuming(mailbox_list) parse_result = parser(response) if parse_result is None: - raise ValueError('invalid List.parse argument {}', repr(response)) + raise ValueError(f'invalid List.parse argument {repr(response)}') (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) + +@dataclass +class MessageData: + number: int + flags: Optional[ListT[str]] + internal_date: Optional[str] + size: Optional[str] + envelope: Optional[_Envelope] + body_all_sections: Optional[str] + + @staticmethod + def parse(response_bytes: bytes) -> 'MessageData': + try: + response = response_bytes.decode('ASCII') + except UnicodeError: + print(f'unicode fucky wucky occurred:\n{response_bytes}') + response = response_bytes.decode('ASCII', errors='replace') + parser = all_consuming(message_data) + parse_result = parser(response) + if parse_result is None: + raise ValueError(f'invalid MessageData.parse argument:\n{response}') + (msg_number, data), _ = parse_result + + flags = None + internal_date = None + size = None + env = None + body_all_sections = None + for name, value in data: + if name == 'FLAGS' and isinstance(value, list): + flags = value + elif name == 'INTERNALDATE' and isinstance(value, str): + internal_date = value + elif name == 'RFC822.SIZE' and isinstance(value, str): + size = value + elif name == 'ENVELOPE' and isinstance(value, dict): + env = value + elif name == 'BODY[]' and isinstance(value, str): + body_all_sections = value + else: + print('warning: ignoring unknown data name', repr(name)) + return MessageData(msg_number, flags, internal_date, size, env, body_all_sections) |