aboutsummaryrefslogtreecommitdiff
path: root/ctec/imap_response.py
diff options
context:
space:
mode:
Diffstat (limited to 'ctec/imap_response.py')
-rw-r--r--ctec/imap_response.py286
1 files changed, 140 insertions, 146 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py
index 10d6a54..01f550f 100644
--- a/ctec/imap_response.py
+++ b/ctec/imap_response.py
@@ -14,135 +14,135 @@ __all__ = [
]
# common utility functions, manually typed so mypy doesn't get confused
-def _condense_non_none(data: Sequence[Optional[str]]) -> str:
- return ''.join(x for x in data if x is not None)
+def _condense_non_none(data: Sequence[Optional[bytes]]) -> bytes:
+ return b''.join(x for x in data if x is not None)
# level 0
-ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable())
+ctl: Parser[bytes] = verify(take_n(1), lambda c: not c.decode().isprintable())
-digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789')
+digit: Parser[bytes] = verify(take_n(1), lambda c: c in b'0123456789')
-digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789')
+digit_nz: Parser[bytes] = verify(take_n(1), lambda c: c in b'123456789')
-dquote: Parser[str] = tag('"')
+dquote: Parser[bytes] = tag(b'"')
-list_wildcards: Parser[str] = alt(tag('%'), tag('*'))
+list_wildcards: Parser[bytes] = alt(tag(b'%'), tag(b'*'))
-nil: Parser[None] = map_parser(itag('NIL'), lambda _: None)
+nil: Parser[None] = map_parser(itag(b'NIL'), lambda _: None)
-resp_specials: Parser[str] = tag(']')
+resp_specials: Parser[bytes] = tag(b']')
-text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n')
+text_char: Parser[bytes] = verify(take_n(1), lambda c: c not in b'\r\n')
# level 1
-date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2)))
+date_day_fixed: Parser[bytes] = alt(preceded(tag(b' '), digit), string_concat(many_m_n(digit, 2, 2)))
-date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()))
+date_month: Parser[bytes] = alt(*(itag(x) for x in b'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()))
-date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4))
+date_year: Parser[bytes] = string_concat(many_m_n(digit, 4, 4))
-number: Parser[str] = take_while1(as_predicate(digit))
+number: Parser[bytes] = take_while1(as_predicate(digit))
-nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join)
+nz_number: Parser[bytes] = string_concat(pair(digit_nz, take_while0(as_predicate(digit))))
-quoted_specials: Parser[str] = alt(dquote, tag('\\'))
+quoted_specials: Parser[bytes] = alt(dquote, tag(b'\\'))
-time: Parser[str] = map_parser(separated_triple(
+time: Parser[bytes] = map_parser(separated_triple(
string_concat(many_m_n(digit, 2, 2)),
- tag(':'),
+ tag(b':'),
string_concat(many_m_n(digit, 2, 2)),
- tag(':'),
+ tag(b':'),
string_concat(many_m_n(digit, 2, 2))
-), ':'.join)
+), b':'.join)
-zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4))))
+zone: Parser[bytes] = string_concat(pair(alt(tag(b'+'), tag(b'-')), string_concat(many_m_n(digit, 4, 4))))
# level 2
-atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
+atom_specials: Parser[bytes] = alt(verify(take_n(1), lambda c: c in b'(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
-date_time: Parser[str] = delimited(
+date_time: Parser[bytes] = delimited(
dquote,
map_parser(separated_triple(
- map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join),
- tag(' '),
+ map_parser(separated_triple(date_day_fixed, tag(b'-'), date_month, tag(b'-'), date_year), b'-'.join),
+ tag(b' '),
time,
- tag(' '),
+ tag(b' '),
zone
- ), ' '.join),
+ ), b' '.join),
dquote
)
-literal: Parser[str] = and_then(
- delimited(tag('{'), number, tag('}\r\n')),
- lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n)))
+literal: Parser[bytes] = and_then(
+ delimited(tag(b'{'), number, tag(b'}\r\n')),
+ lambda n: preceded(delimited(tag(b'{'), number, tag(b'}\r\n')), take_n(int(n)))
)
-quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials))
+quoted_char: Parser[bytes] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag(b'\\'), quoted_specials))
-section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join)
+section_part: Parser[bytes] = map_parser(separated_many1(nz_number, tag(b'.')), b'.'.join)
# level 3
-atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None)
+atom_char: Parser[bytes] = verify(take_n(1), lambda c: atom_specials(c) is None)
-quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote)
+quoted: Parser[bytes] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote)
# level 4
-astring_char: Parser[str] = alt(atom_char, resp_specials)
+astring_char: Parser[bytes] = alt(atom_char, resp_specials)
-atom: Parser[str] = take_while1(as_predicate(atom_char))
+atom: Parser[bytes] = take_while1(as_predicate(atom_char))
-string: Parser[str] = alt(quoted, literal)
+string: Parser[bytes] = alt(quoted, literal)
# level 5
-astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string)
+astring: Parser[bytes] = alt(take_while1(as_predicate(astring_char)), string)
-flag_keyword: Parser[str] = atom
+flag_keyword: Parser[bytes] = atom
-flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join)
+flag_extension: Parser[bytes] = string_concat(pair(tag(b'\\'), atom))
-nstring: Parser[Optional[str]] = alt(string, nil)
+nstring: Parser[Optional[bytes]] = alt(string, nil)
# level 6
-addr_adl: Parser[Optional[str]] = nstring
-addr_host: Parser[Optional[str]] = nstring
-addr_mailbox: Parser[Optional[str]] = nstring
-addr_name: Parser[Optional[str]] = nstring
-
-env_date: Parser[Optional[str]] = nstring
-env_in_reply_to: Parser[Optional[str]] = nstring
-env_message_id: Parser[Optional[str]] = nstring
-env_subject: Parser[Optional[str]] = nstring
-
-flag: Parser[str] = alt(
- *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']),
+addr_adl: Parser[Optional[bytes]] = nstring
+addr_host: Parser[Optional[bytes]] = nstring
+addr_mailbox: Parser[Optional[bytes]] = nstring
+addr_name: Parser[Optional[bytes]] = nstring
+
+env_date: Parser[Optional[bytes]] = nstring
+env_in_reply_to: Parser[Optional[bytes]] = nstring
+env_message_id: Parser[Optional[bytes]] = nstring
+env_subject: Parser[Optional[bytes]] = nstring
+
+flag: Parser[bytes] = alt(
+ *(itag(b'\\' + x) for x in [b'Answered', b'Flagged', b'Deleted', b'Seen', b'Draft']),
flag_keyword,
flag_extension
)
-header_fld_name: Parser[str] = astring
+header_fld_name: Parser[bytes] = astring
-mailbox: Parser[str] = alt(itag('INBOX'), astring)
+mailbox: Parser[bytes] = alt(itag(b'INBOX'), astring)
-mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension)
+mbx_list_flag: Parser[bytes] = alt(itag(rb'\Noselect'), itag(rb'\Marked'), itag(rb'\Unmarked'), itag(rb'\Noinferiors'), flag_extension)
# level 7
-_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]
-def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress:
+_ParsedAddress = Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[bytes]]
+def _merge_for_address(t: Tuple[Tuple[Optional[bytes], Optional[bytes]], Tuple[Optional[bytes], Optional[bytes]]]) -> _ParsedAddress:
return (*t[0], *t[1])
-address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair(
- separated_pair(addr_name, tag(' '), addr_adl),
- tag(' '),
- separated_pair(addr_mailbox, tag(' '), addr_host)
-), tag(')')), _merge_for_address)
+address: Parser[_ParsedAddress] = map_parser(delimited(tag(b'('), separated_pair(
+ separated_pair(addr_name, tag(b' '), addr_adl),
+ tag(b' '),
+ separated_pair(addr_mailbox, tag(b' '), addr_host)
+), tag(b')')), _merge_for_address)
-flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag)
+flag_fetch: Parser[bytes] = alt(itag(rb'\Recent'), flag)
-header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')'))
+header_list: Parser[ListT[bytes]] = delimited(tag(b'('), separated_many1(header_fld_name, tag(b' ')), tag(b')'))
-mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' '))
+mbx_list_flags: Parser[ListT[bytes]] = separated_many0(mbx_list_flag, tag(b' '))
# level 8
-_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list()))
+_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag(b'('), many1(address), tag(b')')), map_parser(nil, lambda _: list()))
env_bcc: Parser[ListT[_ParsedAddress]] = _address_list
env_cc: Parser[ListT[_ParsedAddress]] = _address_list
env_from: Parser[ListT[_ParsedAddress]] = _address_list
@@ -150,167 +150,161 @@ env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list
env_sender: Parser[ListT[_ParsedAddress]] = _address_list
env_to: Parser[ListT[_ParsedAddress]] = _address_list
-mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple(
- delimited(tag('('), mbx_list_flags, tag(')')),
- tag(' '),
+mailbox_list: Parser[Tuple[ListT[bytes], Optional[bytes], bytes]] = separated_triple(
+ delimited(tag(b'('), mbx_list_flags, tag(b')')),
+ tag(b' '),
alt(delimited(dquote, quoted_char, dquote), nil),
- tag(' '),
+ tag(b' '),
mailbox,
)
-msg_att_dynamic: Parser[ListT[str]] = delimited(
- itag('FLAGS ('),
- separated_many0(flag_fetch, tag(' ')),
- tag(')')
+msg_att_dynamic: Parser[ListT[bytes]] = delimited(
+ itag(b'FLAGS ('),
+ separated_many0(flag_fetch, tag(b' ')),
+ tag(b')')
)
-section_msgtext: Parser[str] = alt(
+section_msgtext: Parser[bytes] = alt(
string_concat(separated_pair(
- map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none),
- tag(' '),
+ map_parser(pair(itag(b'HEADER.FIELDS'), opt(itag(b'.NOT'))), _condense_non_none),
+ tag(b' '),
string_concat(header_list)
)),
- itag('HEADER'),
- itag('TEXT')
+ itag(b'HEADER'),
+ itag(b'TEXT')
)
# level 9
-_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]]
-def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]:
- def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]:
+_Envelope = Dict[bytes, Union[Optional[bytes], ListT[_ParsedAddress]]]
+def _label_nstring(name: bytes) -> Callable[[Optional[bytes]], Tuple[bytes, Optional[bytes]]]:
+ def give_label(x: Optional[bytes]) -> Tuple[bytes, Optional[bytes]]:
return name, x
return give_label
-def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]:
- def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]:
+def _label_address_list(name: bytes) -> Callable[[ListT[_ParsedAddress]], Tuple[bytes, ListT[_ParsedAddress]]]:
+ def give_label(x: ListT[_ParsedAddress]) -> Tuple[bytes, ListT[_ParsedAddress]]:
return name, x
return give_label
def _flatten_envelope(data: Tuple) -> _Envelope:
def do_flatten(x: Tuple):
- if isinstance(x[0], str):
+ if isinstance(x[0], bytes):
yield x
else:
for datum in x:
yield from do_flatten(datum)
return dict(do_flatten(data))
envelope: Parser[_Envelope] = map_parser(delimited(
- tag('('),
+ tag(b'('),
separated_triple(
separated_triple(
- map_parser(env_date, _label_nstring('date')),
- tag(' '),
- map_parser(env_subject, _label_nstring('subject')),
- tag(' '),
- map_parser(env_from, _label_address_list('from'))
+ map_parser(env_date, _label_nstring(b'date')),
+ tag(b' '),
+ map_parser(env_subject, _label_nstring(b'subject')),
+ tag(b' '),
+ map_parser(env_from, _label_address_list(b'from'))
),
- tag(' '),
+ tag(b' '),
separated_triple(
- map_parser(env_sender, _label_address_list('sender')),
- tag(' '),
- map_parser(env_reply_to, _label_address_list('reply_to')),
- tag(' '),
- map_parser(env_to, _label_address_list('to'))
+ map_parser(env_sender, _label_address_list(b'sender')),
+ tag(b' '),
+ map_parser(env_reply_to, _label_address_list(b'reply_to')),
+ tag(b' '),
+ map_parser(env_to, _label_address_list(b'to'))
),
- tag(' '),
+ tag(b' '),
separated_pair(
separated_pair(
- map_parser(env_cc, _label_address_list('cc')),
- tag(' '),
- map_parser(env_bcc, _label_address_list('bcc'))
+ map_parser(env_cc, _label_address_list(b'cc')),
+ tag(b' '),
+ map_parser(env_bcc, _label_address_list(b'bcc'))
),
- tag(' '),
+ tag(b' '),
separated_pair(
- map_parser(env_in_reply_to, _label_nstring('in_reply_to')),
- tag(' '),
- map_parser(env_message_id, _label_nstring('message_id'))
+ map_parser(env_in_reply_to, _label_nstring(b'in_reply_to')),
+ tag(b' '),
+ map_parser(env_message_id, _label_nstring(b'message_id'))
)
)
),
- tag(')')
+ tag(b')')
), _flatten_envelope)
-section_text: Parser[str] = alt(section_msgtext, itag('MIME'))
+section_text: Parser[bytes] = alt(section_msgtext, itag(b'MIME'))
# level 10
-section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none))
+section_spec: Parser[bytes] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag(b'.'), section_text))), _condense_non_none))
# level 11
-section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none)
+section: Parser[bytes] = map_parser(triple(tag(b'['), opt(section_spec), tag(b']')), _condense_non_none)
# level 12
-msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt(
- separated_pair(itag('ENVELOPE'), tag(' '), envelope),
- separated_pair(itag('INTERNALDATE'), tag(' '), date_time),
+msg_att_static: Parser[Tuple[bytes, Union[Optional[bytes], _Envelope]]] = alt(
+ separated_pair(itag(b'ENVELOPE'), tag(b' '), envelope),
+ separated_pair(itag(b'INTERNALDATE'), tag(b' '), date_time),
separated_pair(
map_parser(
- pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))),
+ pair(itag(b'RFC822'), opt(alt(itag(b'.HEADER'), itag(b'.TEXT')))),
_condense_non_none
),
- tag(' '),
+ tag(b' '),
nstring
),
- separated_pair(itag('RFC822.SIZE'), tag(' '), number),
+ separated_pair(itag(b'RFC822.SIZE'), tag(b' '), number),
# TODO BODY
separated_pair(
map_parser(
- triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))),
+ triple(itag(b'BODY'), section, opt(delimited(tag(b'<'), number, tag(b'>')))),
_condense_non_none
),
- tag(' '),
+ tag(b' '),
nstring
)
# TODO UID
)
# level 13
-msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited(
- tag('('),
- separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')),
- tag(')')
+msg_att: Parser[ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]] = delimited(
+ tag(b'('),
+ separated_many1(alt(map_parser(msg_att_dynamic, lambda x: (b'FLAGS', x)), msg_att_static), tag(b' ')),
+ tag(b')')
)
# level 14
-message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair(
+message_data: Parser[Tuple[int, ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]]] = separated_pair(
map_parser(nz_number, int),
- tag(' '), # TODO what does imaplib do with expunge replies
+ tag(b' '), # TODO what does imaplib do with expunge replies
msg_att
)
@dataclass
class List:
- attributes: ListT[str]
- delimiter: Optional[str]
- name: str
+ attributes: ListT[bytes]
+ delimiter: Optional[bytes]
+ name: bytes
@staticmethod
- def parse(response_bytes: bytes) -> 'List':
- response = response_bytes.decode('ASCII')
+ def parse(response: bytes) -> 'List':
parser = all_consuming(mailbox_list)
parse_result = parser(response)
if parse_result is None:
- raise ValueError(f'invalid List.parse argument {repr(response)}')
+ raise ValueError(f'invalid List.parse argument:\n{response!r}')
(attributes, delimiter, name), _ = parse_result
return List(attributes, delimiter, name)
@dataclass
class MessageData:
number: int
- flags: Optional[ListT[str]]
- internal_date: Optional[str]
- size: Optional[str]
+ flags: Optional[ListT[bytes]]
+ internal_date: Optional[bytes]
+ size: Optional[bytes]
envelope: Optional[_Envelope]
- body_all_sections: Optional[str]
+ body_all_sections: Optional[bytes]
@staticmethod
- def parse(response_bytes: bytes) -> 'MessageData':
- try:
- response = response_bytes.decode('ASCII')
- except UnicodeError:
- print(f'unicode fucky wucky occurred:\n{response_bytes}')
- response = response_bytes.decode('ASCII', errors='replace')
+ def parse(response: bytes) -> 'MessageData':
parser = all_consuming(message_data)
parse_result = parser(response)
if parse_result is None:
- raise ValueError(f'invalid MessageData.parse argument:\n{response}')
+ raise ValueError(f'invalid MessageData.parse argument:\n{response!r}')
(msg_number, data), _ = parse_result
flags = None
@@ -319,15 +313,15 @@ class MessageData:
env = None
body_all_sections = None
for name, value in data:
- if name == 'FLAGS' and isinstance(value, list):
+ if name == b'FLAGS' and isinstance(value, list):
flags = value
- elif name == 'INTERNALDATE' and isinstance(value, str):
+ elif name == b'INTERNALDATE' and isinstance(value, bytes):
internal_date = value
- elif name == 'RFC822.SIZE' and isinstance(value, str):
+ elif name == b'RFC822.SIZE' and isinstance(value, bytes):
size = value
- elif name == 'ENVELOPE' and isinstance(value, dict):
+ elif name == b'ENVELOPE' and isinstance(value, dict):
env = value
- elif name == 'BODY[]' and isinstance(value, str):
+ elif name == b'BODY[]' and isinstance(value, bytes):
body_all_sections = value
else:
print('warning: ignoring unknown data name', repr(name))