From 2180a5802310032fa048643da03825c82a91ea32 Mon Sep 17 00:00:00 2001 From: Melody Horn Date: Sat, 1 May 2021 04:47:02 -0600 Subject: avoid unicode fuckery wherever possible --- ctec/imap_response.py | 286 ++++++++++++++++++++++++-------------------------- ctec/logic.py | 49 ++++----- ctec/parse_utils.py | 80 +++++++------- 3 files changed, 206 insertions(+), 209 deletions(-) diff --git a/ctec/imap_response.py b/ctec/imap_response.py index 10d6a54..01f550f 100644 --- a/ctec/imap_response.py +++ b/ctec/imap_response.py @@ -14,135 +14,135 @@ __all__ = [ ] # common utility functions, manually typed so mypy doesn't get confused -def _condense_non_none(data: Sequence[Optional[str]]) -> str: - return ''.join(x for x in data if x is not None) +def _condense_non_none(data: Sequence[Optional[bytes]]) -> bytes: + return b''.join(x for x in data if x is not None) # level 0 -ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable()) +ctl: Parser[bytes] = verify(take_n(1), lambda c: not c.decode().isprintable()) -digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789') +digit: Parser[bytes] = verify(take_n(1), lambda c: c in b'0123456789') -digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789') +digit_nz: Parser[bytes] = verify(take_n(1), lambda c: c in b'123456789') -dquote: Parser[str] = tag('"') +dquote: Parser[bytes] = tag(b'"') -list_wildcards: Parser[str] = alt(tag('%'), tag('*')) +list_wildcards: Parser[bytes] = alt(tag(b'%'), tag(b'*')) -nil: Parser[None] = map_parser(itag('NIL'), lambda _: None) +nil: Parser[None] = map_parser(itag(b'NIL'), lambda _: None) -resp_specials: Parser[str] = tag(']') +resp_specials: Parser[bytes] = tag(b']') -text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n') +text_char: Parser[bytes] = verify(take_n(1), lambda c: c not in b'\r\n') # level 1 -date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2))) +date_day_fixed: Parser[bytes] = alt(preceded(tag(b' '), digit), string_concat(many_m_n(digit, 2, 2))) -date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) +date_month: Parser[bytes] = alt(*(itag(x) for x in b'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split())) -date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4)) +date_year: Parser[bytes] = string_concat(many_m_n(digit, 4, 4)) -number: Parser[str] = take_while1(as_predicate(digit)) +number: Parser[bytes] = take_while1(as_predicate(digit)) -nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join) +nz_number: Parser[bytes] = string_concat(pair(digit_nz, take_while0(as_predicate(digit)))) -quoted_specials: Parser[str] = alt(dquote, tag('\\')) +quoted_specials: Parser[bytes] = alt(dquote, tag(b'\\')) -time: Parser[str] = map_parser(separated_triple( +time: Parser[bytes] = map_parser(separated_triple( string_concat(many_m_n(digit, 2, 2)), - tag(':'), + tag(b':'), string_concat(many_m_n(digit, 2, 2)), - tag(':'), + tag(b':'), string_concat(many_m_n(digit, 2, 2)) -), ':'.join) +), b':'.join) -zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4)))) +zone: Parser[bytes] = string_concat(pair(alt(tag(b'+'), tag(b'-')), string_concat(many_m_n(digit, 4, 4)))) # level 2 -atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials) +atom_specials: Parser[bytes] = alt(verify(take_n(1), lambda c: c in b'(){ '), ctl, list_wildcards, quoted_specials, resp_specials) -date_time: Parser[str] = delimited( +date_time: Parser[bytes] = delimited( dquote, map_parser(separated_triple( - map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join), - tag(' '), + map_parser(separated_triple(date_day_fixed, tag(b'-'), date_month, tag(b'-'), date_year), b'-'.join), + tag(b' '), time, - tag(' '), + tag(b' '), zone - ), ' '.join), + ), b' '.join), dquote ) -literal: Parser[str] = and_then( - delimited(tag('{'), number, tag('}\r\n')), - lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n))) +literal: Parser[bytes] = and_then( + delimited(tag(b'{'), number, tag(b'}\r\n')), + lambda n: preceded(delimited(tag(b'{'), number, tag(b'}\r\n')), take_n(int(n))) ) -quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials)) +quoted_char: Parser[bytes] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag(b'\\'), quoted_specials)) -section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join) +section_part: Parser[bytes] = map_parser(separated_many1(nz_number, tag(b'.')), b'.'.join) # level 3 -atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None) +atom_char: Parser[bytes] = verify(take_n(1), lambda c: atom_specials(c) is None) -quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) +quoted: Parser[bytes] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote) # level 4 -astring_char: Parser[str] = alt(atom_char, resp_specials) +astring_char: Parser[bytes] = alt(atom_char, resp_specials) -atom: Parser[str] = take_while1(as_predicate(atom_char)) +atom: Parser[bytes] = take_while1(as_predicate(atom_char)) -string: Parser[str] = alt(quoted, literal) +string: Parser[bytes] = alt(quoted, literal) # level 5 -astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string) +astring: Parser[bytes] = alt(take_while1(as_predicate(astring_char)), string) -flag_keyword: Parser[str] = atom +flag_keyword: Parser[bytes] = atom -flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join) +flag_extension: Parser[bytes] = string_concat(pair(tag(b'\\'), atom)) -nstring: Parser[Optional[str]] = alt(string, nil) +nstring: Parser[Optional[bytes]] = alt(string, nil) # level 6 -addr_adl: Parser[Optional[str]] = nstring -addr_host: Parser[Optional[str]] = nstring -addr_mailbox: Parser[Optional[str]] = nstring -addr_name: Parser[Optional[str]] = nstring - -env_date: Parser[Optional[str]] = nstring -env_in_reply_to: Parser[Optional[str]] = nstring -env_message_id: Parser[Optional[str]] = nstring -env_subject: Parser[Optional[str]] = nstring - -flag: Parser[str] = alt( - *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']), +addr_adl: Parser[Optional[bytes]] = nstring +addr_host: Parser[Optional[bytes]] = nstring +addr_mailbox: Parser[Optional[bytes]] = nstring +addr_name: Parser[Optional[bytes]] = nstring + +env_date: Parser[Optional[bytes]] = nstring +env_in_reply_to: Parser[Optional[bytes]] = nstring +env_message_id: Parser[Optional[bytes]] = nstring +env_subject: Parser[Optional[bytes]] = nstring + +flag: Parser[bytes] = alt( + *(itag(b'\\' + x) for x in [b'Answered', b'Flagged', b'Deleted', b'Seen', b'Draft']), flag_keyword, flag_extension ) -header_fld_name: Parser[str] = astring +header_fld_name: Parser[bytes] = astring -mailbox: Parser[str] = alt(itag('INBOX'), astring) +mailbox: Parser[bytes] = alt(itag(b'INBOX'), astring) -mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension) +mbx_list_flag: Parser[bytes] = alt(itag(rb'\Noselect'), itag(rb'\Marked'), itag(rb'\Unmarked'), itag(rb'\Noinferiors'), flag_extension) # level 7 -_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]] -def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress: +_ParsedAddress = Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[bytes]] +def _merge_for_address(t: Tuple[Tuple[Optional[bytes], Optional[bytes]], Tuple[Optional[bytes], Optional[bytes]]]) -> _ParsedAddress: return (*t[0], *t[1]) -address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair( - separated_pair(addr_name, tag(' '), addr_adl), - tag(' '), - separated_pair(addr_mailbox, tag(' '), addr_host) -), tag(')')), _merge_for_address) +address: Parser[_ParsedAddress] = map_parser(delimited(tag(b'('), separated_pair( + separated_pair(addr_name, tag(b' '), addr_adl), + tag(b' '), + separated_pair(addr_mailbox, tag(b' '), addr_host) +), tag(b')')), _merge_for_address) -flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag) +flag_fetch: Parser[bytes] = alt(itag(rb'\Recent'), flag) -header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')')) +header_list: Parser[ListT[bytes]] = delimited(tag(b'('), separated_many1(header_fld_name, tag(b' ')), tag(b')')) -mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' ')) +mbx_list_flags: Parser[ListT[bytes]] = separated_many0(mbx_list_flag, tag(b' ')) # level 8 -_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list())) +_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag(b'('), many1(address), tag(b')')), map_parser(nil, lambda _: list())) env_bcc: Parser[ListT[_ParsedAddress]] = _address_list env_cc: Parser[ListT[_ParsedAddress]] = _address_list env_from: Parser[ListT[_ParsedAddress]] = _address_list @@ -150,167 +150,161 @@ env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list env_sender: Parser[ListT[_ParsedAddress]] = _address_list env_to: Parser[ListT[_ParsedAddress]] = _address_list -mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple( - delimited(tag('('), mbx_list_flags, tag(')')), - tag(' '), +mailbox_list: Parser[Tuple[ListT[bytes], Optional[bytes], bytes]] = separated_triple( + delimited(tag(b'('), mbx_list_flags, tag(b')')), + tag(b' '), alt(delimited(dquote, quoted_char, dquote), nil), - tag(' '), + tag(b' '), mailbox, ) -msg_att_dynamic: Parser[ListT[str]] = delimited( - itag('FLAGS ('), - separated_many0(flag_fetch, tag(' ')), - tag(')') +msg_att_dynamic: Parser[ListT[bytes]] = delimited( + itag(b'FLAGS ('), + separated_many0(flag_fetch, tag(b' ')), + tag(b')') ) -section_msgtext: Parser[str] = alt( +section_msgtext: Parser[bytes] = alt( string_concat(separated_pair( - map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none), - tag(' '), + map_parser(pair(itag(b'HEADER.FIELDS'), opt(itag(b'.NOT'))), _condense_non_none), + tag(b' '), string_concat(header_list) )), - itag('HEADER'), - itag('TEXT') + itag(b'HEADER'), + itag(b'TEXT') ) # level 9 -_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]] -def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]: - def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]: +_Envelope = Dict[bytes, Union[Optional[bytes], ListT[_ParsedAddress]]] +def _label_nstring(name: bytes) -> Callable[[Optional[bytes]], Tuple[bytes, Optional[bytes]]]: + def give_label(x: Optional[bytes]) -> Tuple[bytes, Optional[bytes]]: return name, x return give_label -def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]: - def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]: +def _label_address_list(name: bytes) -> Callable[[ListT[_ParsedAddress]], Tuple[bytes, ListT[_ParsedAddress]]]: + def give_label(x: ListT[_ParsedAddress]) -> Tuple[bytes, ListT[_ParsedAddress]]: return name, x return give_label def _flatten_envelope(data: Tuple) -> _Envelope: def do_flatten(x: Tuple): - if isinstance(x[0], str): + if isinstance(x[0], bytes): yield x else: for datum in x: yield from do_flatten(datum) return dict(do_flatten(data)) envelope: Parser[_Envelope] = map_parser(delimited( - tag('('), + tag(b'('), separated_triple( separated_triple( - map_parser(env_date, _label_nstring('date')), - tag(' '), - map_parser(env_subject, _label_nstring('subject')), - tag(' '), - map_parser(env_from, _label_address_list('from')) + map_parser(env_date, _label_nstring(b'date')), + tag(b' '), + map_parser(env_subject, _label_nstring(b'subject')), + tag(b' '), + map_parser(env_from, _label_address_list(b'from')) ), - tag(' '), + tag(b' '), separated_triple( - map_parser(env_sender, _label_address_list('sender')), - tag(' '), - map_parser(env_reply_to, _label_address_list('reply_to')), - tag(' '), - map_parser(env_to, _label_address_list('to')) + map_parser(env_sender, _label_address_list(b'sender')), + tag(b' '), + map_parser(env_reply_to, _label_address_list(b'reply_to')), + tag(b' '), + map_parser(env_to, _label_address_list(b'to')) ), - tag(' '), + tag(b' '), separated_pair( separated_pair( - map_parser(env_cc, _label_address_list('cc')), - tag(' '), - map_parser(env_bcc, _label_address_list('bcc')) + map_parser(env_cc, _label_address_list(b'cc')), + tag(b' '), + map_parser(env_bcc, _label_address_list(b'bcc')) ), - tag(' '), + tag(b' '), separated_pair( - map_parser(env_in_reply_to, _label_nstring('in_reply_to')), - tag(' '), - map_parser(env_message_id, _label_nstring('message_id')) + map_parser(env_in_reply_to, _label_nstring(b'in_reply_to')), + tag(b' '), + map_parser(env_message_id, _label_nstring(b'message_id')) ) ) ), - tag(')') + tag(b')') ), _flatten_envelope) -section_text: Parser[str] = alt(section_msgtext, itag('MIME')) +section_text: Parser[bytes] = alt(section_msgtext, itag(b'MIME')) # level 10 -section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none)) +section_spec: Parser[bytes] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag(b'.'), section_text))), _condense_non_none)) # level 11 -section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none) +section: Parser[bytes] = map_parser(triple(tag(b'['), opt(section_spec), tag(b']')), _condense_non_none) # level 12 -msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt( - separated_pair(itag('ENVELOPE'), tag(' '), envelope), - separated_pair(itag('INTERNALDATE'), tag(' '), date_time), +msg_att_static: Parser[Tuple[bytes, Union[Optional[bytes], _Envelope]]] = alt( + separated_pair(itag(b'ENVELOPE'), tag(b' '), envelope), + separated_pair(itag(b'INTERNALDATE'), tag(b' '), date_time), separated_pair( map_parser( - pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))), + pair(itag(b'RFC822'), opt(alt(itag(b'.HEADER'), itag(b'.TEXT')))), _condense_non_none ), - tag(' '), + tag(b' '), nstring ), - separated_pair(itag('RFC822.SIZE'), tag(' '), number), + separated_pair(itag(b'RFC822.SIZE'), tag(b' '), number), # TODO BODY separated_pair( map_parser( - triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))), + triple(itag(b'BODY'), section, opt(delimited(tag(b'<'), number, tag(b'>')))), _condense_non_none ), - tag(' '), + tag(b' '), nstring ) # TODO UID ) # level 13 -msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited( - tag('('), - separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')), - tag(')') +msg_att: Parser[ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]] = delimited( + tag(b'('), + separated_many1(alt(map_parser(msg_att_dynamic, lambda x: (b'FLAGS', x)), msg_att_static), tag(b' ')), + tag(b')') ) # level 14 -message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair( +message_data: Parser[Tuple[int, ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]]] = separated_pair( map_parser(nz_number, int), - tag(' '), # TODO what does imaplib do with expunge replies + tag(b' '), # TODO what does imaplib do with expunge replies msg_att ) @dataclass class List: - attributes: ListT[str] - delimiter: Optional[str] - name: str + attributes: ListT[bytes] + delimiter: Optional[bytes] + name: bytes @staticmethod - def parse(response_bytes: bytes) -> 'List': - response = response_bytes.decode('ASCII') + def parse(response: bytes) -> 'List': parser = all_consuming(mailbox_list) parse_result = parser(response) if parse_result is None: - raise ValueError(f'invalid List.parse argument {repr(response)}') + raise ValueError(f'invalid List.parse argument:\n{response!r}') (attributes, delimiter, name), _ = parse_result return List(attributes, delimiter, name) @dataclass class MessageData: number: int - flags: Optional[ListT[str]] - internal_date: Optional[str] - size: Optional[str] + flags: Optional[ListT[bytes]] + internal_date: Optional[bytes] + size: Optional[bytes] envelope: Optional[_Envelope] - body_all_sections: Optional[str] + body_all_sections: Optional[bytes] @staticmethod - def parse(response_bytes: bytes) -> 'MessageData': - try: - response = response_bytes.decode('ASCII') - except UnicodeError: - print(f'unicode fucky wucky occurred:\n{response_bytes}') - response = response_bytes.decode('ASCII', errors='replace') + def parse(response: bytes) -> 'MessageData': parser = all_consuming(message_data) parse_result = parser(response) if parse_result is None: - raise ValueError(f'invalid MessageData.parse argument:\n{response}') + raise ValueError(f'invalid MessageData.parse argument:\n{response!r}') (msg_number, data), _ = parse_result flags = None @@ -319,15 +313,15 @@ class MessageData: env = None body_all_sections = None for name, value in data: - if name == 'FLAGS' and isinstance(value, list): + if name == b'FLAGS' and isinstance(value, list): flags = value - elif name == 'INTERNALDATE' and isinstance(value, str): + elif name == b'INTERNALDATE' and isinstance(value, bytes): internal_date = value - elif name == 'RFC822.SIZE' and isinstance(value, str): + elif name == b'RFC822.SIZE' and isinstance(value, bytes): size = value - elif name == 'ENVELOPE' and isinstance(value, dict): + elif name == b'ENVELOPE' and isinstance(value, dict): env = value - elif name == 'BODY[]' and isinstance(value, str): + elif name == b'BODY[]' and isinstance(value, bytes): body_all_sections = value else: print('warning: ignoring unknown data name', repr(name)) diff --git a/ctec/logic.py b/ctec/logic.py index d151a0f..d2767bc 100644 --- a/ctec/logic.py +++ b/ctec/logic.py @@ -9,19 +9,20 @@ mailbox.Maildir.colon = '!' MESSAGE_DOWNLOAD_BATCH = 1 -def percent_encode(c: str) -> str: - return '%' + hex(ord(c))[2:] +def percent_encode(c: bytes) -> bytes: + return b'%' + c.hex().encode() -def clean_folder_name(folder_name: str, separator: str) -> str: - folder_name = folder_name.replace('%', percent_encode('%')) - folder_name = folder_name.replace('.', percent_encode('.')) - folder_name = folder_name.replace(separator, '.') - return folder_name +def clean_folder_name(folder_name: bytes, separator: bytes) -> str: + folder_name = folder_name.replace(b'%', percent_encode(b'%')) + folder_name = folder_name.replace(b'.', percent_encode(b'.')) + folder_name = folder_name.replace(separator, b'.') + return folder_name.decode() -def dirty_folder_name(folder_name: str, separator: str = '/') -> str: - folder_name = folder_name.replace('.', separator) - folder_name = folder_name.replace(percent_encode('.'), '.') - folder_name = folder_name.replace(percent_encode('%'), '%') +def dirty_folder_name(folder_name_str: str, separator: bytes = b'/') -> bytes: + folder_name = folder_name_str.encode() + folder_name = folder_name.replace(b'.', separator) + folder_name = folder_name.replace(percent_encode(b'.'), b'.') + folder_name = folder_name.replace(percent_encode(b'%'), b'%') return folder_name class Account: @@ -42,13 +43,13 @@ class Account: typ, folder_list = self.connection.list() for folder in folder_list: folder_info = imap_response.List.parse(folder) - if folder_info.delimiter != '/': + if folder_info.delimiter != b'/': raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter') self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) def fetch_folder(self, folder_name: str): folder = self.mailbox.add_folder(folder_name) - message_id_to_key = dict((message['Message-ID'], key) for key, message in folder.iteritems() if 'Message-ID' in message) + message_id_to_key = dict((message['Message-ID'].encode(), key) for key, message in folder.iteritems() if 'Message-ID' in message) typ, count_data = self.connection.select(dirty_folder_name(folder_name)) if typ != 'OK': print(typ, count_data) @@ -72,22 +73,22 @@ class Account: if parsed_data.envelope is None: print('hold the fuck up where did that envelope go', parsed_data) else: - message_id = parsed_data.envelope.get('message_id', None) + message_id = parsed_data.envelope.get(b'message_id', None) if message_id is None: print('no message ID for', parsed_data) elif message_id in message_id_to_key: message: mailbox.MaildirMessage = folder[message_id_to_key[message_id]] flags_list = parsed_data.flags or [] flags = '' - if r'\Seen' in flags_list: + if rb'\Seen' in flags_list: flags += 'S' - if r'\Answered' in flags_list: + if rb'\Answered' in flags_list: flags += 'R' # Replied to - if r'\Flagged' in flags_list: + if rb'\Flagged' in flags_list: flags += 'F' - if r'\Deleted' in flags_list: + if rb'\Deleted' in flags_list: flags += 'T' # Trashed - if r'\Draft' in flags_list: + if rb'\Draft' in flags_list: flags += 'D' message.set_flags(flags) else: @@ -119,15 +120,15 @@ class Account: message = mailbox.MaildirMessage(parsed_data.body_all_sections) flags_list = parsed_data.flags or [] flags = '' - if r'\Seen' in flags_list: + if rb'\Seen' in flags_list: flags += 'S' - if r'\Answered' in flags_list: + if rb'\Answered' in flags_list: flags += 'R' # Replied to - if r'\Flagged' in flags_list: + if rb'\Flagged' in flags_list: flags += 'F' - if r'\Deleted' in flags_list: + if rb'\Deleted' in flags_list: flags += 'T' # Trashed - if r'\Draft' in flags_list: + if rb'\Draft' in flags_list: flags += 'D' message.set_flags(flags) folder.add(message) diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py index 890d36f..7ba2ccf 100644 --- a/ctec/parse_utils.py +++ b/ctec/parse_utils.py @@ -37,16 +37,16 @@ T1 = TypeVar('T1') T2 = TypeVar('T2') T3 = TypeVar('T3') -ParseResult = Optional[Tuple[T, str]] -Parser = Callable[[str], ParseResult[T]] +ParseResult = Optional[Tuple[T, bytes]] +Parser = Callable[[bytes], ParseResult[T]] -def as_predicate(parser: Parser[T]) -> Callable[[str], bool]: - def check(text: str) -> bool: - return parser(text) is not None +def as_predicate(parser: Parser[T]) -> Callable[[int], bool]: + def check(text: int) -> bool: + return parser(bytes([text])) is not None return check def alt(*parsers: Parser[T]) -> Parser[T]: - def parse(text: str) -> ParseResult[T]: + def parse(text: bytes) -> ParseResult[T]: for parser in parsers[:-1]: result = parser(text) if result is not None: @@ -54,56 +54,58 @@ def alt(*parsers: Parser[T]) -> Parser[T]: return parsers[-1](text) return parse -def tag(tag_text: str) -> Parser[str]: - def parse(text: str) -> ParseResult[str]: +def tag(tag_text: bytes) -> Parser[bytes]: + def parse(text: bytes) -> ParseResult[bytes]: if text.startswith(tag_text): return tag_text, text[len(tag_text):] return None return parse # case-insensitive tag -def itag(tag_text: str) -> Parser[str]: - def parse(text: str) -> ParseResult[str]: - if text.casefold().startswith(tag_text.casefold()): +def itag(tag_text: bytes) -> Parser[bytes]: + def parse(text: bytes) -> ParseResult[bytes]: + tag_str = tag_text.decode() + text_str = text.decode() + if text_str.casefold().startswith(tag_str.casefold()): return tag_text, text[len(tag_text):] return None return parse -def take_while0(predicate: Callable[[str], bool]) -> Parser[str]: - def parse(text: str) -> ParseResult[str]: +def take_while0(predicate: Callable[[int], bool]) -> Parser[bytes]: + def parse(text: bytes) -> ParseResult[bytes]: for i in range(len(text)): if not predicate(text[i]): return text[:i], text[i:] - return text, "" + return text, b"" return parse -def take_while1(predicate: Callable[[str], bool]) -> Parser[str]: - def parse(text: str) -> ParseResult[str]: +def take_while1(predicate: Callable[[int], bool]) -> Parser[bytes]: + def parse(text: bytes) -> ParseResult[bytes]: if len(text) == 0 or not predicate(text[0]): return None for i in range(1, len(text)): if not predicate(text[i]): return text[:i], text[i:] - return text, "" + return text, b"" return parse -def take_till1(predicate: Callable[[str], bool]) -> Parser[str]: +def take_till1(predicate: Callable[[int], bool]) -> Parser[bytes]: return take_while1(lambda x: not predicate(x)) -def take_n(n: int) -> Parser[str]: - def parse(text: str) -> ParseResult[str]: +def take_n(n: int) -> Parser[bytes]: + def parse(text: bytes) -> ParseResult[bytes]: if len(text) < n: return None return text[:n], text[n:] return parse -def any_char(text: str) -> ParseResult[str]: +def any_char(text: bytes) -> ParseResult[bytes]: if len(text) > 0: - return text[0], text[1:] + return text[0:1], text[1:] return None def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]: - def parse(text: str) -> ParseResult[T]: + def parse(text: bytes) -> ParseResult[T]: parsed_result = parser(text) if parsed_result is None: if debug: @@ -114,11 +116,11 @@ def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]: if debug: print('all_consuming: leftover text {}', repr(extra)) return None - return result, '' + return result, b'' return parse def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]: - def parse(text: str) -> ParseResult[T2]: + def parse(text: bytes) -> ParseResult[T2]: parsed_result = parser(text) if parsed_result is None: return None @@ -127,7 +129,7 @@ def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]: return parse def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[T2]]) -> Parser[T2]: - def parse(text: str) -> ParseResult[T2]: + def parse(text: bytes) -> ParseResult[T2]: parsed_result = first_parser(text) if parsed_result is None: return None @@ -136,7 +138,7 @@ def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[ return parse def opt(parser: Parser[T]) -> Parser[Optional[T]]: - def parse(text: str) -> ParseResult[Optional[T]]: + def parse(text: bytes) -> ParseResult[Optional[T]]: result = parser(text) if result is None: return None, text @@ -144,7 +146,7 @@ def opt(parser: Parser[T]) -> Parser[Optional[T]]: return parse def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]: - def parse(text: str) -> ParseResult[T]: + def parse(text: bytes) -> ParseResult[T]: parsed_result = parser(text) if parsed_result is None: return None @@ -155,7 +157,7 @@ def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]: return parse def many0(parser: Parser[T]) -> Parser[List[T]]: - def parse(text: str) -> ParseResult[List[T]]: + def parse(text: bytes) -> ParseResult[List[T]]: result = [] parser_result = parser(text) while parser_result is not None: @@ -166,7 +168,7 @@ def many0(parser: Parser[T]) -> Parser[List[T]]: return parse def many1(parser: Parser[T]) -> Parser[List[T]]: - def parse(text: str) -> ParseResult[List[T]]: + def parse(text: bytes) -> ParseResult[List[T]]: parser_result = parser(text) if parser_result is None: return None @@ -182,7 +184,7 @@ def many1(parser: Parser[T]) -> Parser[List[T]]: return parse def many_m_n(parser: Parser[T], min_inclusive: int, max_inclusive: int) -> Parser[List[T]]: - def parse(text: str) -> ParseResult[List[T]]: + def parse(text: bytes) -> ParseResult[List[T]]: result: List[T] = [] while len(result) < min_inclusive: parser_result = parser(text) @@ -201,7 +203,7 @@ def many_m_n(parser: Parser[T], min_inclusive: int, max_inclusive: int) -> Parse return parse def separated_many0(parser: Parser[T], separator_parser: Parser) -> Parser[List[T]]: - def parse(text: str) -> ParseResult[List[T]]: + def parse(text: bytes) -> ParseResult[List[T]]: result = [] while True: parser_result = parser(text) @@ -221,7 +223,7 @@ def separated_many1(parser: Parser[T], separator_parser: Parser) -> Parser[List[ return verify(separated_many0(parser, separator_parser), lambda result: len(result) > 0) def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser[T2]) -> Parser[T]: - def parse(text: str) -> ParseResult[T]: + def parse(text: bytes) -> ParseResult[T]: before_result = before_parser(text) if before_result is None: return None @@ -241,7 +243,7 @@ def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser return parse def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]: - def parse(text: str) -> ParseResult[Tuple[T1, T2]]: + def parse(text: bytes) -> ParseResult[Tuple[T1, T2]]: first_parsed_result = first_parser(text) if first_parsed_result is None: return None @@ -256,7 +258,7 @@ def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1 return parse def triple(first_parser: Parser[T1], second_parser: Parser[T2], third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]: - def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]: + def parse(text: bytes) -> ParseResult[Tuple[T1, T2, T3]]: first_parsed_result = first_parser(text) if first_parsed_result is None: return None @@ -286,7 +288,7 @@ def followed(parser: Parser[T], after_parser: Parser[T1]) -> Parser[T]: return map_parser(pair(parser, after_parser), first) def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]: - def parse(text: str) -> ParseResult[Tuple[T1, T2]]: + def parse(text: bytes) -> ParseResult[Tuple[T1, T2]]: first_parsed_result = first_parser(text) if first_parsed_result is None: return None @@ -306,7 +308,7 @@ def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_p return parse def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_parser: Parser[T2], between23_parser: Parser, third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]: - def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]: + def parse(text: bytes) -> ParseResult[Tuple[T1, T2, T3]]: first_parsed_result = first_parser(text) if first_parsed_result is None: return None @@ -335,5 +337,5 @@ def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_ return (first_result, second_result, third_result), extra return parse -def string_concat(parser: Parser[Sequence[str]]) -> Parser[str]: - return map_parser(parser, ''.join) +def string_concat(parser: Parser[Sequence[bytes]]) -> Parser[bytes]: + return map_parser(parser, b''.join) -- cgit v1.2.3