aboutsummaryrefslogtreecommitdiff
path: root/ctec/imap_response.py
diff options
context:
space:
mode:
authorMelody Horn <melody@boringcactus.com>2021-05-01 04:13:08 -0600
committerMelody Horn <melody@boringcactus.com>2021-05-01 04:13:08 -0600
commite824ee1564806359b580cb7f9975dc8f0aaa2e73 (patch)
treeff0c76a300c976a1520779fa180ecf5f573a393e /ctec/imap_response.py
parent2ef831feb8cd1b2393196877b7b7c93ac49093d7 (diff)
downloadctec-e824ee1564806359b580cb7f9975dc8f0aaa2e73.tar.gz
ctec-e824ee1564806359b580cb7f9975dc8f0aaa2e73.zip
fetch message contents
this still has some duplicates and breaks on MIME Content-Transfer-Encoding: 8bit but like. that's life
Diffstat (limited to 'ctec/imap_response.py')
-rw-r--r--ctec/imap_response.py254
1 files changed, 248 insertions, 6 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py
index cad76c9..10d6a54 100644
--- a/ctec/imap_response.py
+++ b/ctec/imap_response.py
@@ -1,39 +1,77 @@
# this should really be in the stdlib imo
from dataclasses import dataclass
-from typing import List as ListT, Tuple, Optional
+from typing import List as ListT, Tuple, Optional, Union, Dict, Callable, Sequence
from .parse_utils import (
Parser, take_while1, tag, delimited, take_n, alt, separated_many0, separated_triple, all_consuming, verify,
- preceded, and_then, itag, take_while0, as_predicate
+ preceded, and_then, itag, take_while0, as_predicate, separated_pair, pair, separated_many1, opt, map_parser,
+ many_m_n, string_concat, many1, triple
)
__all__ = [
'List',
+ 'MessageData',
]
+# common utility functions, manually typed so mypy doesn't get confused
+def _condense_non_none(data: Sequence[Optional[str]]) -> str:
+ return ''.join(x for x in data if x is not None)
+
# level 0
ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable())
digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789')
+digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789')
+
dquote: Parser[str] = tag('"')
list_wildcards: Parser[str] = alt(tag('%'), tag('*'))
-nil: Parser[str] = itag('NIL')
+nil: Parser[None] = map_parser(itag('NIL'), lambda _: None)
resp_specials: Parser[str] = tag(']')
text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n')
# level 1
+date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2)))
+
+date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()))
+
+date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4))
+
number: Parser[str] = take_while1(as_predicate(digit))
+nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join)
+
quoted_specials: Parser[str] = alt(dquote, tag('\\'))
+time: Parser[str] = map_parser(separated_triple(
+ string_concat(many_m_n(digit, 2, 2)),
+ tag(':'),
+ string_concat(many_m_n(digit, 2, 2)),
+ tag(':'),
+ string_concat(many_m_n(digit, 2, 2))
+), ':'.join)
+
+zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4))))
+
# level 2
atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
+date_time: Parser[str] = delimited(
+ dquote,
+ map_parser(separated_triple(
+ map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join),
+ tag(' '),
+ time,
+ tag(' '),
+ zone
+ ), ' '.join),
+ dquote
+)
+
literal: Parser[str] = and_then(
delimited(tag('{'), number, tag('}\r\n')),
lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n)))
@@ -41,6 +79,8 @@ literal: Parser[str] = and_then(
quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials))
+section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join)
+
# level 3
atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None)
@@ -56,17 +96,60 @@ string: Parser[str] = alt(quoted, literal)
# level 5
astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string)
-flag_extension: Parser[str] = preceded(tag('\\'), atom)
+flag_keyword: Parser[str] = atom
+
+flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join)
+
+nstring: Parser[Optional[str]] = alt(string, nil)
# level 6
+addr_adl: Parser[Optional[str]] = nstring
+addr_host: Parser[Optional[str]] = nstring
+addr_mailbox: Parser[Optional[str]] = nstring
+addr_name: Parser[Optional[str]] = nstring
+
+env_date: Parser[Optional[str]] = nstring
+env_in_reply_to: Parser[Optional[str]] = nstring
+env_message_id: Parser[Optional[str]] = nstring
+env_subject: Parser[Optional[str]] = nstring
+
+flag: Parser[str] = alt(
+ *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']),
+ flag_keyword,
+ flag_extension
+)
+
+header_fld_name: Parser[str] = astring
+
mailbox: Parser[str] = alt(itag('INBOX'), astring)
mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension)
# level 7
+_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]
+def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress:
+ return (*t[0], *t[1])
+address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair(
+ separated_pair(addr_name, tag(' '), addr_adl),
+ tag(' '),
+ separated_pair(addr_mailbox, tag(' '), addr_host)
+), tag(')')), _merge_for_address)
+
+flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag)
+
+header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')'))
+
mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' '))
# level 8
+_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list()))
+env_bcc: Parser[ListT[_ParsedAddress]] = _address_list
+env_cc: Parser[ListT[_ParsedAddress]] = _address_list
+env_from: Parser[ListT[_ParsedAddress]] = _address_list
+env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list
+env_sender: Parser[ListT[_ParsedAddress]] = _address_list
+env_to: Parser[ListT[_ParsedAddress]] = _address_list
+
mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple(
delimited(tag('('), mbx_list_flags, tag(')')),
tag(' '),
@@ -75,6 +158,123 @@ mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple(
mailbox,
)
+msg_att_dynamic: Parser[ListT[str]] = delimited(
+ itag('FLAGS ('),
+ separated_many0(flag_fetch, tag(' ')),
+ tag(')')
+)
+
+section_msgtext: Parser[str] = alt(
+ string_concat(separated_pair(
+ map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none),
+ tag(' '),
+ string_concat(header_list)
+ )),
+ itag('HEADER'),
+ itag('TEXT')
+)
+
+# level 9
+_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]]
+def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]:
+ def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]:
+ return name, x
+ return give_label
+def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]:
+ def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]:
+ return name, x
+ return give_label
+def _flatten_envelope(data: Tuple) -> _Envelope:
+ def do_flatten(x: Tuple):
+ if isinstance(x[0], str):
+ yield x
+ else:
+ for datum in x:
+ yield from do_flatten(datum)
+ return dict(do_flatten(data))
+envelope: Parser[_Envelope] = map_parser(delimited(
+ tag('('),
+ separated_triple(
+ separated_triple(
+ map_parser(env_date, _label_nstring('date')),
+ tag(' '),
+ map_parser(env_subject, _label_nstring('subject')),
+ tag(' '),
+ map_parser(env_from, _label_address_list('from'))
+ ),
+ tag(' '),
+ separated_triple(
+ map_parser(env_sender, _label_address_list('sender')),
+ tag(' '),
+ map_parser(env_reply_to, _label_address_list('reply_to')),
+ tag(' '),
+ map_parser(env_to, _label_address_list('to'))
+ ),
+ tag(' '),
+ separated_pair(
+ separated_pair(
+ map_parser(env_cc, _label_address_list('cc')),
+ tag(' '),
+ map_parser(env_bcc, _label_address_list('bcc'))
+ ),
+ tag(' '),
+ separated_pair(
+ map_parser(env_in_reply_to, _label_nstring('in_reply_to')),
+ tag(' '),
+ map_parser(env_message_id, _label_nstring('message_id'))
+ )
+ )
+ ),
+ tag(')')
+), _flatten_envelope)
+
+section_text: Parser[str] = alt(section_msgtext, itag('MIME'))
+
+# level 10
+section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none))
+
+# level 11
+section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none)
+
+# level 12
+msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt(
+ separated_pair(itag('ENVELOPE'), tag(' '), envelope),
+ separated_pair(itag('INTERNALDATE'), tag(' '), date_time),
+ separated_pair(
+ map_parser(
+ pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))),
+ _condense_non_none
+ ),
+ tag(' '),
+ nstring
+ ),
+ separated_pair(itag('RFC822.SIZE'), tag(' '), number),
+ # TODO BODY
+ separated_pair(
+ map_parser(
+ triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))),
+ _condense_non_none
+ ),
+ tag(' '),
+ nstring
+ )
+ # TODO UID
+)
+
+# level 13
+msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited(
+ tag('('),
+ separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')),
+ tag(')')
+)
+
+# level 14
+message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair(
+ map_parser(nz_number, int),
+ tag(' '), # TODO what does imaplib do with expunge replies
+ msg_att
+)
+
@dataclass
class List:
attributes: ListT[str]
@@ -84,9 +284,51 @@ class List:
@staticmethod
def parse(response_bytes: bytes) -> 'List':
response = response_bytes.decode('ASCII')
- parser = all_consuming(mailbox_list, debug=True)
+ parser = all_consuming(mailbox_list)
parse_result = parser(response)
if parse_result is None:
- raise ValueError('invalid List.parse argument {}', repr(response))
+ raise ValueError(f'invalid List.parse argument {repr(response)}')
(attributes, delimiter, name), _ = parse_result
return List(attributes, delimiter, name)
+
+@dataclass
+class MessageData:
+ number: int
+ flags: Optional[ListT[str]]
+ internal_date: Optional[str]
+ size: Optional[str]
+ envelope: Optional[_Envelope]
+ body_all_sections: Optional[str]
+
+ @staticmethod
+ def parse(response_bytes: bytes) -> 'MessageData':
+ try:
+ response = response_bytes.decode('ASCII')
+ except UnicodeError:
+ print(f'unicode fucky wucky occurred:\n{response_bytes}')
+ response = response_bytes.decode('ASCII', errors='replace')
+ parser = all_consuming(message_data)
+ parse_result = parser(response)
+ if parse_result is None:
+ raise ValueError(f'invalid MessageData.parse argument:\n{response}')
+ (msg_number, data), _ = parse_result
+
+ flags = None
+ internal_date = None
+ size = None
+ env = None
+ body_all_sections = None
+ for name, value in data:
+ if name == 'FLAGS' and isinstance(value, list):
+ flags = value
+ elif name == 'INTERNALDATE' and isinstance(value, str):
+ internal_date = value
+ elif name == 'RFC822.SIZE' and isinstance(value, str):
+ size = value
+ elif name == 'ENVELOPE' and isinstance(value, dict):
+ env = value
+ elif name == 'BODY[]' and isinstance(value, str):
+ body_all_sections = value
+ else:
+ print('warning: ignoring unknown data name', repr(name))
+ return MessageData(msg_number, flags, internal_date, size, env, body_all_sections)