From 2180a5802310032fa048643da03825c82a91ea32 Mon Sep 17 00:00:00 2001 From: Melody Horn Date: Sat, 1 May 2021 04:47:02 -0600 Subject: avoid unicode fuckery wherever possible --- ctec/imap_response.py | 286 ++++++++++++++++++++++++-------------------------- 1 file changed, 140 insertions(+), 146 deletions(-) (limited to 'ctec/imap_response.py') diff --git a/ctec/imap_response.py b/ctec/imap_response.py index 10d6a54..01f550f 100644 --- a/ctec/imap_response.py +++ b/ctec/imap_response.py @@ -14,135 +14,135 @@ __all__ = [ ] # common utility functions, manually typed so mypy doesn't get confused -def _condense_non_none(data: Sequence[Optional[str]]) -> str: - return ''.join(x for x in data if x is not None) +def _condense_non_none(data: Sequence[Optional[bytes]]) -> bytes: + return b''.join(x for x in data if x is not None) # level 0 -ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) +ctl: Parser[bytes] = verify(take_n(1), lambda c: not c.decode().isprintable()) -digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') +digit: Parser[bytes] = verify(take_n(1), lambda c: c in b'0123456789') -digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789') +digit_nz: Parser[bytes] = verify(take_n(1), lambda c: c in b'123456789') -dquote: Parser[str] = tag('"') +dquote: Parser[bytes] = tag(b'"') -list_wildcards: Parser[str] = alt(tag('%'), tag('*')) +list_wildcards: Parser[bytes] = alt(tag(b'%'), tag(b'*')) -nil: Parser[None] = map_parser(itag('NIL'), lambda _: None) +nil: Parser[None] = map_parser(itag(b'NIL'), lambda _: None) -resp_specials: Parser[str] = tag(']') +resp_specials: Parser[bytes] = tag(b']') -text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') +text_char: Parser[bytes] = verify(take_n(1), lambda c: c not in b'\r\n') # level 1 -date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2))) +date_day_fixed: Parser[bytes] = alt(preceded(tag(b' '), digit), string_concat(many_m_n(digit, 2, 2))) -date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) +date_month: Parser[bytes] = alt(*(itag(x) for x in b'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) -date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4)) +date_year: Parser[bytes] = string_concat(many_m_n(digit, 4, 4)) -number: Parser[str] = take_while1(as_predicate(digit)) +number: Parser[bytes] = take_while1(as_predicate(digit)) -nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join) +nz_number: Parser[bytes] = string_concat(pair(digit_nz, take_while0(as_predicate(digit)))) -quoted_specials: Parser[str] = alt(dquote, tag('\\')) +quoted_specials: Parser[bytes] = alt(dquote, tag(b'\\')) -time: Parser[str] = map_parser(separated_triple( +time: Parser[bytes] = map_parser(separated_triple( string_concat(many_m_n(digit, 2, 2)), - tag(':'), + tag(b':'), string_concat(many_m_n(digit, 2, 2)), - tag(':'), + tag(b':'), string_concat(many_m_n(digit, 2, 2)) -), ':'.join) +), b':'.join) -zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4)))) +zone: Parser[bytes] = string_concat(pair(alt(tag(b'+'), tag(b'-')), string_concat(many_m_n(digit, 4, 4)))) # level 2 -atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) +atom_specials: Parser[bytes] = alt(verify(take_n(1), lambda c: c in b'(){ '), ctl, list_wildcards, quoted_specials, resp_specials) -date_time: Parser[str] = delimited( +date_time: Parser[bytes] = delimited( dquote, map_parser(separated_triple( - map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join), - tag(' '), + map_parser(separated_triple(date_day_fixed, tag(b'-'), date_month, tag(b'-'), date_year), b'-'.join), + tag(b' '), time, - tag(' '), + tag(b' '), zone - ), ' '.join), + ), b' '.join), dquote ) -literal: Parser[str] = and_then( - delimited(tag('{'), number, tag('}\r\n')), - lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) +literal: Parser[bytes] = and_then( + delimited(tag(b'{'), number, tag(b'}\r\n')), + lambda n: preceded(delimited(tag(b'{'), number, tag(b'}\r\n')), take_n(int(n))) ) -quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) +quoted_char: Parser[bytes] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag(b'\\'), quoted_specials)) -section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join) +section_part: Parser[bytes] = map_parser(separated_many1(nz_number, tag(b'.')), b'.'.join) # level 3 -atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) +atom_char: Parser[bytes] = verify(take_n(1), lambda c: atom_specials(c) is None) -quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) +quoted: Parser[bytes] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) # level 4 -astring_char: Parser[str] = alt(atom_char, resp_specials) +astring_char: Parser[bytes] = alt(atom_char, resp_specials) -atom: Parser[str] = take_while1(as_predicate(atom_char)) +atom: Parser[bytes] = take_while1(as_predicate(atom_char)) -string: Parser[str] = alt(quoted, literal) +string: Parser[bytes] = alt(quoted, literal) # level 5 -astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) +astring: Parser[bytes] = alt(take_while1(as_predicate(astring_char)), string) -flag_keyword: Parser[str] = atom +flag_keyword: Parser[bytes] = atom -flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join) +flag_extension: Parser[bytes] = string_concat(pair(tag(b'\\'), atom)) -nstring: Parser[Optional[str]] = alt(string, nil) +nstring: Parser[Optional[bytes]] = alt(string, nil) # level 6 -addr_adl: Parser[Optional[str]] = nstring -addr_host: Parser[Optional[str]] = nstring -addr_mailbox: Parser[Optional[str]] = nstring -addr_name: Parser[Optional[str]] = nstring - -env_date: Parser[Optional[str]] = nstring -env_in_reply_to: Parser[Optional[str]] = nstring -env_message_id: Parser[Optional[str]] = nstring -env_subject: Parser[Optional[str]] = nstring - -flag: Parser[str] = alt( - *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']), +addr_adl: Parser[Optional[bytes]] = nstring +addr_host: Parser[Optional[bytes]] = nstring +addr_mailbox: Parser[Optional[bytes]] = nstring +addr_name: Parser[Optional[bytes]] = nstring + +env_date: Parser[Optional[bytes]] = nstring +env_in_reply_to: Parser[Optional[bytes]] = nstring +env_message_id: Parser[Optional[bytes]] = nstring +env_subject: Parser[Optional[bytes]] = nstring + +flag: Parser[bytes] = alt( + *(itag(b'\\' + x) for x in [b'Answered', b'Flagged', b'Deleted', b'Seen', b'Draft']), flag_keyword, flag_extension ) -header_fld_name: Parser[str] = astring +header_fld_name: Parser[bytes] = astring -mailbox: Parser[str] = alt(itag('INBOX'), astring) +mailbox: Parser[bytes] = alt(itag(b'INBOX'), astring) -mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) +mbx_list_flag: Parser[bytes] = alt(itag(rb'\Noselect'), itag(rb'\Marked'), itag(rb'\Unmarked'), itag(rb'\Noinferiors'), flag_extension) # level 7 -_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] -def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress: +_ParsedAddress = Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[bytes]] +def _merge_for_address(t: Tuple[Tuple[Optional[bytes], Optional[bytes]], Tuple[Optional[bytes], Optional[bytes]]]) -> _ParsedAddress: return (*t[0], *t[1]) -address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair( - separated_pair(addr_name, tag(' '), addr_adl), - tag(' '), - separated_pair(addr_mailbox, tag(' '), addr_host) -), tag(')')), _merge_for_address) +address: Parser[_ParsedAddress] = map_parser(delimited(tag(b'('), separated_pair( + separated_pair(addr_name, tag(b' '), addr_adl), + tag(b' '), + separated_pair(addr_mailbox, tag(b' '), addr_host) +), tag(b')')), _merge_for_address) -flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag) +flag_fetch: Parser[bytes] = alt(itag(rb'\Recent'), flag) -header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')')) +header_list: Parser[ListT[bytes]] = delimited(tag(b'('), separated_many1(header_fld_name, tag(b' ')), tag(b')')) -mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) +mbx_list_flags: Parser[ListT[bytes]] = separated_many0(mbx_list_flag, tag(b' ')) # level 8 -_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list())) +_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag(b'('), many1(address), tag(b')')), map_parser(nil, lambda _: list())) env_bcc: Parser[ListT[_ParsedAddress]] = _address_list env_cc: Parser[ListT[_ParsedAddress]] = _address_list env_from: Parser[ListT[_ParsedAddress]] = _address_list @@ -150,167 +150,161 @@ env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list env_sender: Parser[ListT[_ParsedAddress]] = _address_list env_to: Parser[ListT[_ParsedAddress]] = _address_list -mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( - delimited(tag('('), mbx_list_flags, tag(')')), - tag(' '), +mailbox_list: Parser[Tuple[ListT[bytes], Optional[bytes], bytes]] = separated_triple( + delimited(tag(b'('), mbx_list_flags, tag(b')')), + tag(b' '), alt(delimited(dquote, quoted_char, dquote), nil), - tag(' '), + tag(b' '), mailbox, ) -msg_att_dynamic: Parser[ListT[str]] = delimited( - itag('FLAGS ('), - separated_many0(flag_fetch, tag(' ')), - tag(')') +msg_att_dynamic: Parser[ListT[bytes]] = delimited( + itag(b'FLAGS ('), + separated_many0(flag_fetch, tag(b' ')), + tag(b')') ) -section_msgtext: Parser[str] = alt( +section_msgtext: Parser[bytes] = alt( string_concat(separated_pair( - map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none), - tag(' '), + map_parser(pair(itag(b'HEADER.FIELDS'), opt(itag(b'.NOT'))), _condense_non_none), + tag(b' '), string_concat(header_list) )), - itag('HEADER'), - itag('TEXT') + itag(b'HEADER'), + itag(b'TEXT') ) # level 9 -_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]] -def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]: - def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]: +_Envelope = Dict[bytes, Union[Optional[bytes], ListT[_ParsedAddress]]] +def _label_nstring(name: bytes) -> Callable[[Optional[bytes]], Tuple[bytes, Optional[bytes]]]: + def give_label(x: Optional[bytes]) -> Tuple[bytes, Optional[bytes]]: return name, x return give_label -def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]: - def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]: +def _label_address_list(name: bytes) -> Callable[[ListT[_ParsedAddress]], Tuple[bytes, ListT[_ParsedAddress]]]: + def give_label(x: ListT[_ParsedAddress]) -> Tuple[bytes, ListT[_ParsedAddress]]: return name, x return give_label def _flatten_envelope(data: Tuple) -> _Envelope: def do_flatten(x: Tuple): - if isinstance(x[0], str): + if isinstance(x[0], bytes): yield x else: for datum in x: yield from do_flatten(datum) return dict(do_flatten(data)) envelope: Parser[_Envelope] = map_parser(delimited( - tag('('), + tag(b'('), separated_triple( separated_triple( - map_parser(env_date, _label_nstring('date')), - tag(' '), - map_parser(env_subject, _label_nstring('subject')), - tag(' '), - map_parser(env_from, _label_address_list('from')) + map_parser(env_date, _label_nstring(b'date')), + tag(b' '), + map_parser(env_subject, _label_nstring(b'subject')), + tag(b' '), + map_parser(env_from, _label_address_list(b'from')) ), - tag(' '), + tag(b' '), separated_triple( - map_parser(env_sender, _label_address_list('sender')), - tag(' '), - map_parser(env_reply_to, _label_address_list('reply_to')), - tag(' '), - map_parser(env_to, _label_address_list('to')) + map_parser(env_sender, _label_address_list(b'sender')), + tag(b' '), + map_parser(env_reply_to, _label_address_list(b'reply_to')), + tag(b' '), + map_parser(env_to, _label_address_list(b'to')) ), - tag(' '), + tag(b' '), separated_pair( separated_pair( - map_parser(env_cc, _label_address_list('cc')), - tag(' '), - map_parser(env_bcc, _label_address_list('bcc')) + map_parser(env_cc, _label_address_list(b'cc')), + tag(b' '), + map_parser(env_bcc, _label_address_list(b'bcc')) ), - tag(' '), + tag(b' '), separated_pair( - map_parser(env_in_reply_to, _label_nstring('in_reply_to')), - tag(' '), - map_parser(env_message_id, _label_nstring('message_id')) + map_parser(env_in_reply_to, _label_nstring(b'in_reply_to')), + tag(b' '), + map_parser(env_message_id, _label_nstring(b'message_id')) ) ) ), - tag(')') + tag(b')') ), _flatten_envelope) -section_text: Parser[str] = alt(section_msgtext, itag('MIME')) +section_text: Parser[bytes] = alt(section_msgtext, itag(b'MIME')) # level 10 -section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none)) +section_spec: Parser[bytes] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag(b'.'), section_text))), _condense_non_none)) # level 11 -section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none) +section: Parser[bytes] = map_parser(triple(tag(b'['), opt(section_spec), tag(b']')), _condense_non_none) # level 12 -msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt( - separated_pair(itag('ENVELOPE'), tag(' '), envelope), - separated_pair(itag('INTERNALDATE'), tag(' '), date_time), +msg_att_static: Parser[Tuple[bytes, Union[Optional[bytes], _Envelope]]] = alt( + separated_pair(itag(b'ENVELOPE'), tag(b' '), envelope), + separated_pair(itag(b'INTERNALDATE'), tag(b' '), date_time), separated_pair( map_parser( - pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))), + pair(itag(b'RFC822'), opt(alt(itag(b'.HEADER'), itag(b'.TEXT')))), _condense_non_none ), - tag(' '), + tag(b' '), nstring ), - separated_pair(itag('RFC822.SIZE'), tag(' '), number), + separated_pair(itag(b'RFC822.SIZE'), tag(b' '), number), # TODO BODY separated_pair( map_parser( - triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))), + triple(itag(b'BODY'), section, opt(delimited(tag(b'<'), number, tag(b'>')))), _condense_non_none ), - tag(' '), + tag(b' '), nstring ) # TODO UID ) # level 13 -msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited( - tag('('), - separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')), - tag(')') +msg_att: Parser[ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]] = delimited( + tag(b'('), + separated_many1(alt(map_parser(msg_att_dynamic, lambda x: (b'FLAGS', x)), msg_att_static), tag(b' ')), + tag(b')') ) # level 14 -message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair( +message_data: Parser[Tuple[int, ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]]] = separated_pair( map_parser(nz_number, int), - tag(' '), # TODO what does imaplib do with expunge replies + tag(b' '), # TODO what does imaplib do with expunge replies msg_att ) @dataclass class List: - attributes: ListT[str] - delimiter: Optional[str] - name: str + attributes: ListT[bytes] + delimiter: Optional[bytes] + name: bytes @staticmethod - def parse(response_bytes: bytes) -> 'List': - response = response_bytes.decode('ASCII') + def parse(response: bytes) -> 'List': parser = all_consuming(mailbox_list) parse_result = parser(response) if parse_result is None: - raise ValueError(f'invalid List.parse argument {repr(response)}') + raise ValueError(f'invalid List.parse argument:\n{response!r}') (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) @dataclass class MessageData: number: int - flags: Optional[ListT[str]] - internal_date: Optional[str] - size: Optional[str] + flags: Optional[ListT[bytes]] + internal_date: Optional[bytes] + size: Optional[bytes] envelope: Optional[_Envelope] - body_all_sections: Optional[str] + body_all_sections: Optional[bytes] @staticmethod - def parse(response_bytes: bytes) -> 'MessageData': - try: - response = response_bytes.decode('ASCII') - except UnicodeError: - print(f'unicode fucky wucky occurred:\n{response_bytes}') - response = response_bytes.decode('ASCII', errors='replace') + def parse(response: bytes) -> 'MessageData': parser = all_consuming(message_data) parse_result = parser(response) if parse_result is None: - raise ValueError(f'invalid MessageData.parse argument:\n{response}') + raise ValueError(f'invalid MessageData.parse argument:\n{response!r}') (msg_number, data), _ = parse_result flags = None @@ -319,15 +313,15 @@ class MessageData: env = None body_all_sections = None for name, value in data: - if name == 'FLAGS' and isinstance(value, list): + if name == b'FLAGS' and isinstance(value, list): flags = value - elif name == 'INTERNALDATE' and isinstance(value, str): + elif name == b'INTERNALDATE' and isinstance(value, bytes): internal_date = value - elif name == 'RFC822.SIZE' and isinstance(value, str): + elif name == b'RFC822.SIZE' and isinstance(value, bytes): size = value - elif name == 'ENVELOPE' and isinstance(value, dict): + elif name == b'ENVELOPE' and isinstance(value, dict): env = value - elif name == 'BODY[]' and isinstance(value, str): + elif name == b'BODY[]' and isinstance(value, bytes): body_all_sections = value else: print('warning: ignoring unknown data name', repr(name)) -- cgit v1.2.3