# this should really be in the stdlib imo from dataclasses import dataclass from typing import List as ListT, Tuple, Optional, Union, Dict, Callable, Sequence from .parse_utils import ( Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify, preceded, and_then, itag, take_while0, as_predicate, separated_pair, pair, separated_many1, opt, map_parser, many_m_n, string_concat, many1, triple ) __all__ = [ 'List', 'MessageData', ] # common utility functions, manually typed so mypy doesn't get confused def _condense_non_none(data: Sequence[Optional[str]]) -> str: return ''.join(x for x in data if x is not None) # level 0 ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789') dquote: Parser[str] = tag('"') list_wildcards: Parser[str] = alt(tag('%'), tag('*')) nil: Parser[None] = map_parser(itag('NIL'), lambda _: None) resp_specials: Parser[str] = tag(']') text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') # level 1 date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2))) date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4)) number: Parser[str] = take_while1(as_predicate(digit)) nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join) quoted_specials: Parser[str] = alt(dquote, tag('\\')) time: Parser[str] = map_parser(separated_triple( string_concat(many_m_n(digit, 2, 2)), tag(':'), string_concat(many_m_n(digit, 2, 2)), tag(':'), string_concat(many_m_n(digit, 2, 2)) ), ':'.join) zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4)))) # level 2 atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) date_time: Parser[str] = delimited( dquote, map_parser(separated_triple( map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join), tag(' '), time, tag(' '), zone ), ' '.join), dquote ) literal: Parser[str] = and_then( delimited(tag('{'), number, tag('}\r\n')), lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) ) quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join) # level 3 atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) # level 4 astring_char: Parser[str] = alt(atom_char, resp_specials) atom: Parser[str] = take_while1(as_predicate(atom_char)) string: Parser[str] = alt(quoted, literal) # level 5 astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) flag_keyword: Parser[str] = atom flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join) nstring: Parser[Optional[str]] = alt(string, nil) # level 6 addr_adl: Parser[Optional[str]] = nstring addr_host: Parser[Optional[str]] = nstring addr_mailbox: Parser[Optional[str]] = nstring addr_name: Parser[Optional[str]] = nstring env_date: Parser[Optional[str]] = nstring env_in_reply_to: Parser[Optional[str]] = nstring env_message_id: Parser[Optional[str]] = nstring env_subject: Parser[Optional[str]] = nstring flag: Parser[str] = alt( *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']), flag_keyword, flag_extension ) header_fld_name: Parser[str] = astring mailbox: Parser[str] = alt(itag('INBOX'), astring) mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) # level 7 _ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress: return (*t[0], *t[1]) address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair( separated_pair(addr_name, tag(' '), addr_adl), tag(' '), separated_pair(addr_mailbox, tag(' '), addr_host) ), tag(')')), _merge_for_address) flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag) header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')')) mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) # level 8 _address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list())) env_bcc: Parser[ListT[_ParsedAddress]] = _address_list env_cc: Parser[ListT[_ParsedAddress]] = _address_list env_from: Parser[ListT[_ParsedAddress]] = _address_list env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list env_sender: Parser[ListT[_ParsedAddress]] = _address_list env_to: Parser[ListT[_ParsedAddress]] = _address_list mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( delimited(tag('('), mbx_list_flags, tag(')')), tag(' '), alt(delimited(dquote, quoted_char, dquote), nil), tag(' '), mailbox, ) msg_att_dynamic: Parser[ListT[str]] = delimited( itag('FLAGS ('), separated_many0(flag_fetch, tag(' ')), tag(')') ) section_msgtext: Parser[str] = alt( string_concat(separated_pair( map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none), tag(' '), string_concat(header_list) )), itag('HEADER'), itag('TEXT') ) # level 9 _Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]] def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]: def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]: return name, x return give_label def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]: def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]: return name, x return give_label def _flatten_envelope(data: Tuple) -> _Envelope: def do_flatten(x: Tuple): if isinstance(x[0], str): yield x else: for datum in x: yield from do_flatten(datum) return dict(do_flatten(data)) envelope: Parser[_Envelope] = map_parser(delimited( tag('('), separated_triple( separated_triple( map_parser(env_date, _label_nstring('date')), tag(' '), map_parser(env_subject, _label_nstring('subject')), tag(' '), map_parser(env_from, _label_address_list('from')) ), tag(' '), separated_triple( map_parser(env_sender, _label_address_list('sender')), tag(' '), map_parser(env_reply_to, _label_address_list('reply_to')), tag(' '), map_parser(env_to, _label_address_list('to')) ), tag(' '), separated_pair( separated_pair( map_parser(env_cc, _label_address_list('cc')), tag(' '), map_parser(env_bcc, _label_address_list('bcc')) ), tag(' '), separated_pair( map_parser(env_in_reply_to, _label_nstring('in_reply_to')), tag(' '), map_parser(env_message_id, _label_nstring('message_id')) ) ) ), tag(')') ), _flatten_envelope) section_text: Parser[str] = alt(section_msgtext, itag('MIME')) # level 10 section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none)) # level 11 section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none) # level 12 msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt( separated_pair(itag('ENVELOPE'), tag(' '), envelope), separated_pair(itag('INTERNALDATE'), tag(' '), date_time), separated_pair( map_parser( pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))), _condense_non_none ), tag(' '), nstring ), separated_pair(itag('RFC822.SIZE'), tag(' '), number), # TODO BODY separated_pair( map_parser( triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))), _condense_non_none ), tag(' '), nstring ) # TODO UID ) # level 13 msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited( tag('('), separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')), tag(')') ) # level 14 message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair( map_parser(nz_number, int), tag(' '), # TODO what does imaplib do with expunge replies msg_att ) @dataclass class List: attributes: ListT[str] delimiter: Optional[str] name: str @staticmethod def parse(response_bytes: bytes) -> 'List': response = response_bytes.decode('ASCII') parser = all_consuming(mailbox_list) parse_result = parser(response) if parse_result is None: raise ValueError(f'invalid List.parse argument {repr(response)}') (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) @dataclass class MessageData: number: int flags: Optional[ListT[str]] internal_date: Optional[str] size: Optional[str] envelope: Optional[_Envelope] body_all_sections: Optional[str] @staticmethod def parse(response_bytes: bytes) -> 'MessageData': try: response = response_bytes.decode('ASCII') except UnicodeError: print(f'unicode fucky wucky occurred:\n{response_bytes}') response = response_bytes.decode('ASCII', errors='replace') parser = all_consuming(message_data) parse_result = parser(response) if parse_result is None: raise ValueError(f'invalid MessageData.parse argument:\n{response}') (msg_number, data), _ = parse_result flags = None internal_date = None size = None env = None body_all_sections = None for name, value in data: if name == 'FLAGS' and isinstance(value, list): flags = value elif name == 'INTERNALDATE' and isinstance(value, str): internal_date = value elif name == 'RFC822.SIZE' and isinstance(value, str): size = value elif name == 'ENVELOPE' and isinstance(value, dict): env = value elif name == 'BODY[]' and isinstance(value, str): body_all_sections = value else: print('warning: ignoring unknown data name', repr(name)) return MessageData(msg_number, flags, internal_date, size, env, body_all_sections)