# this should really be in the stdlib imo from dataclasses import dataclass from typing import List as ListT, Tuple, Optional, Union, Dict, Callable, Sequence from .parse_utils import ( Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify, preceded, and_then, itag, take_while0, as_predicate, separated_pair, pair, separated_many1, opt, map_parser, many_m_n, string_concat, many1, triple ) __all__ = [ 'List', 'MessageData', ] # common utility functions, manually typed so mypy doesn't get confused def _condense_non_none(data: Sequence[Optional[bytes]]) -> bytes: return b''.join(x for x in data if x is not None) # level 0 ctl: Parser[bytes] = verify(take_n(1), lambda c: not c.decode().isprintable()) digit: Parser[bytes] = verify(take_n(1), lambda c: c in b'0123456789') digit_nz: Parser[bytes] = verify(take_n(1), lambda c: c in b'123456789') dquote: Parser[bytes] = tag(b'"') list_wildcards: Parser[bytes] = alt(tag(b'%'), tag(b'*')) nil: Parser[None] = map_parser(itag(b'NIL'), lambda _: None) resp_specials: Parser[bytes] = tag(b']') text_char: Parser[bytes] = verify(take_n(1), lambda c: c not in b'\r\n') # level 1 date_day_fixed: Parser[bytes] = alt(preceded(tag(b' '), digit), string_concat(many_m_n(digit, 2, 2))) date_month: Parser[bytes] = alt(*(itag(x) for x in b'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) date_year: Parser[bytes] = string_concat(many_m_n(digit, 4, 4)) number: Parser[bytes] = take_while1(as_predicate(digit)) nz_number: Parser[bytes] = string_concat(pair(digit_nz, take_while0(as_predicate(digit)))) quoted_specials: Parser[bytes] = alt(dquote, tag(b'\\')) time: Parser[bytes] = map_parser(separated_triple( string_concat(many_m_n(digit, 2, 2)), tag(b':'), string_concat(many_m_n(digit, 2, 2)), tag(b':'), string_concat(many_m_n(digit, 2, 2)) ), b':'.join) zone: Parser[bytes] = string_concat(pair(alt(tag(b'+'), tag(b'-')), string_concat(many_m_n(digit, 4, 4)))) # level 2 atom_specials: Parser[bytes] = alt(verify(take_n(1), lambda c: c in b'(){ '), ctl, list_wildcards, quoted_specials, resp_specials) date_time: Parser[bytes] = delimited( dquote, map_parser(separated_triple( map_parser(separated_triple(date_day_fixed, tag(b'-'), date_month, tag(b'-'), date_year), b'-'.join), tag(b' '), time, tag(b' '), zone ), b' '.join), dquote ) literal: Parser[bytes] = and_then( delimited(tag(b'{'), number, tag(b'}\r\n')), lambda n: preceded(delimited(tag(b'{'), number, tag(b'}\r\n')), take_n(int(n))) ) quoted_char: Parser[bytes] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag(b'\\'), quoted_specials)) section_part: Parser[bytes] = map_parser(separated_many1(nz_number, tag(b'.')), b'.'.join) # level 3 atom_char: Parser[bytes] = verify(take_n(1), lambda c: atom_specials(c) is None) quoted: Parser[bytes] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) # level 4 astring_char: Parser[bytes] = alt(atom_char, resp_specials) atom: Parser[bytes] = take_while1(as_predicate(atom_char)) string: Parser[bytes] = alt(quoted, literal) # level 5 astring: Parser[bytes] = alt(take_while1(as_predicate(astring_char)), string) flag_keyword: Parser[bytes] = atom flag_extension: Parser[bytes] = string_concat(pair(tag(b'\\'), atom)) nstring: Parser[Optional[bytes]] = alt(string, nil) # level 6 addr_adl: Parser[Optional[bytes]] = nstring addr_host: Parser[Optional[bytes]] = nstring addr_mailbox: Parser[Optional[bytes]] = nstring addr_name: Parser[Optional[bytes]] = nstring env_date: Parser[Optional[bytes]] = nstring env_in_reply_to: Parser[Optional[bytes]] = nstring env_message_id: Parser[Optional[bytes]] = nstring env_subject: Parser[Optional[bytes]] = nstring flag: Parser[bytes] = alt( *(itag(b'\\' + x) for x in [b'Answered', b'Flagged', b'Deleted', b'Seen', b'Draft']), flag_keyword, flag_extension ) header_fld_name: Parser[bytes] = astring mailbox: Parser[bytes] = alt(itag(b'INBOX'), astring) mbx_list_flag: Parser[bytes] = alt(itag(rb'\Noselect'), itag(rb'\Marked'), itag(rb'\Unmarked'), itag(rb'\Noinferiors'), flag_extension) # level 7 _ParsedAddress = Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[bytes]] def _merge_for_address(t: Tuple[Tuple[Optional[bytes], Optional[bytes]], Tuple[Optional[bytes], Optional[bytes]]]) -> _ParsedAddress: return (*t[0], *t[1]) address: Parser[_ParsedAddress] = map_parser(delimited(tag(b'('), separated_pair( separated_pair(addr_name, tag(b' '), addr_adl), tag(b' '), separated_pair(addr_mailbox, tag(b' '), addr_host) ), tag(b')')), _merge_for_address) flag_fetch: Parser[bytes] = alt(itag(rb'\Recent'), flag) header_list: Parser[ListT[bytes]] = delimited(tag(b'('), separated_many1(header_fld_name, tag(b' ')), tag(b')')) mbx_list_flags: Parser[ListT[bytes]] = separated_many0(mbx_list_flag, tag(b' ')) # level 8 _address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag(b'('), many1(address), tag(b')')), map_parser(nil, lambda _: list())) env_bcc: Parser[ListT[_ParsedAddress]] = _address_list env_cc: Parser[ListT[_ParsedAddress]] = _address_list env_from: Parser[ListT[_ParsedAddress]] = _address_list env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list env_sender: Parser[ListT[_ParsedAddress]] = _address_list env_to: Parser[ListT[_ParsedAddress]] = _address_list mailbox_list: Parser[Tuple[ListT[bytes], Optional[bytes], bytes]] = separated_triple( delimited(tag(b'('), mbx_list_flags, tag(b')')), tag(b' '), alt(delimited(dquote, quoted_char, dquote), nil), tag(b' '), mailbox, ) msg_att_dynamic: Parser[ListT[bytes]] = delimited( itag(b'FLAGS ('), separated_many0(flag_fetch, tag(b' ')), tag(b')') ) section_msgtext: Parser[bytes] = alt( string_concat(separated_pair( map_parser(pair(itag(b'HEADER.FIELDS'), opt(itag(b'.NOT'))), _condense_non_none), tag(b' '), string_concat(header_list) )), itag(b'HEADER'), itag(b'TEXT') ) # level 9 _Envelope = Dict[bytes, Union[Optional[bytes], ListT[_ParsedAddress]]] def _label_nstring(name: bytes) -> Callable[[Optional[bytes]], Tuple[bytes, Optional[bytes]]]: def give_label(x: Optional[bytes]) -> Tuple[bytes, Optional[bytes]]: return name, x return give_label def _label_address_list(name: bytes) -> Callable[[ListT[_ParsedAddress]], Tuple[bytes, ListT[_ParsedAddress]]]: def give_label(x: ListT[_ParsedAddress]) -> Tuple[bytes, ListT[_ParsedAddress]]: return name, x return give_label def _flatten_envelope(data: Tuple) -> _Envelope: def do_flatten(x: Tuple): if isinstance(x[0], bytes): yield x else: for datum in x: yield from do_flatten(datum) return dict(do_flatten(data)) envelope: Parser[_Envelope] = map_parser(delimited( tag(b'('), separated_triple( separated_triple( map_parser(env_date, _label_nstring(b'date')), tag(b' '), map_parser(env_subject, _label_nstring(b'subject')), tag(b' '), map_parser(env_from, _label_address_list(b'from')) ), tag(b' '), separated_triple( map_parser(env_sender, _label_address_list(b'sender')), tag(b' '), map_parser(env_reply_to, _label_address_list(b'reply_to')), tag(b' '), map_parser(env_to, _label_address_list(b'to')) ), tag(b' '), separated_pair( separated_pair( map_parser(env_cc, _label_address_list(b'cc')), tag(b' '), map_parser(env_bcc, _label_address_list(b'bcc')) ), tag(b' '), separated_pair( map_parser(env_in_reply_to, _label_nstring(b'in_reply_to')), tag(b' '), map_parser(env_message_id, _label_nstring(b'message_id')) ) ) ), tag(b')') ), _flatten_envelope) section_text: Parser[bytes] = alt(section_msgtext, itag(b'MIME')) # level 10 section_spec: Parser[bytes] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag(b'.'), section_text))), _condense_non_none)) # level 11 section: Parser[bytes] = map_parser(triple(tag(b'['), opt(section_spec), tag(b']')), _condense_non_none) # level 12 msg_att_static: Parser[Tuple[bytes, Union[Optional[bytes], _Envelope]]] = alt( separated_pair(itag(b'ENVELOPE'), tag(b' '), envelope), separated_pair(itag(b'INTERNALDATE'), tag(b' '), date_time), separated_pair( map_parser( pair(itag(b'RFC822'), opt(alt(itag(b'.HEADER'), itag(b'.TEXT')))), _condense_non_none ), tag(b' '), nstring ), separated_pair(itag(b'RFC822.SIZE'), tag(b' '), number), # TODO BODY separated_pair( map_parser( triple(itag(b'BODY'), section, opt(delimited(tag(b'<'), number, tag(b'>')))), _condense_non_none ), tag(b' '), nstring ) # TODO UID ) # level 13 msg_att: Parser[ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]] = delimited( tag(b'('), separated_many1(alt(map_parser(msg_att_dynamic, lambda x: (b'FLAGS', x)), msg_att_static), tag(b' ')), tag(b')') ) # level 14 message_data: Parser[Tuple[int, ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]]] = separated_pair( map_parser(nz_number, int), tag(b' '), # TODO what does imaplib do with expunge replies msg_att ) @dataclass class List: attributes: ListT[bytes] delimiter: Optional[bytes] name: bytes @staticmethod def parse(response: bytes) -> 'List': parser = all_consuming(mailbox_list) parse_result = parser(response) if parse_result is None: raise ValueError(f'invalid List.parse argument:\n{response!r}') (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) @dataclass class MessageData: number: int flags: Optional[ListT[bytes]] internal_date: Optional[bytes] size: Optional[bytes] envelope: Optional[_Envelope] body_all_sections: Optional[bytes] @staticmethod def parse(response: bytes) -> 'MessageData': parser = all_consuming(message_data) parse_result = parser(response) if parse_result is None: raise ValueError(f'invalid MessageData.parse argument:\n{response!r}') (msg_number, data), _ = parse_result flags = None internal_date = None size = None env = None body_all_sections = None for name, value in data: if name == b'FLAGS' and isinstance(value, list): flags = value elif name == b'INTERNALDATE' and isinstance(value, bytes): internal_date = value elif name == b'RFC822.SIZE' and isinstance(value, bytes): size = value elif name == b'ENVELOPE' and isinstance(value, dict): env = value elif name == b'BODY[]' and isinstance(value, bytes): body_all_sections = value else: print('warning: ignoring unknown data name', repr(name)) return MessageData(msg_number, flags, internal_date, size, env, body_all_sections)