aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ctec/imap_response.py286
-rw-r--r--ctec/logic.py49
-rw-r--r--ctec/parse_utils.py80
3 files changed, 206 insertions, 209 deletions
diff --git a/ctec/imap_response.py b/ctec/imap_response.py
index 10d6a54..01f550f 100644
--- a/ctec/imap_response.py
+++ b/ctec/imap_response.py
@@ -14,135 +14,135 @@ __all__ = [
]
# common utility functions, manually typed so mypy doesn't get confused
-def _condense_non_none(data: Sequence[Optional[str]]) -> str:
- return ''.join(x for x in data if x is not None)
+def _condense_non_none(data: Sequence[Optional[bytes]]) -> bytes:
+ return b''.join(x for x in data if x is not None)
# level 0
-ctl: Parser[str] = verify(take_n(1), lambda c: not c.isprintable())
+ctl: Parser[bytes] = verify(take_n(1), lambda c: not c.decode().isprintable())
-digit: Parser[str] = verify(take_n(1), lambda c: c in '0123456789')
+digit: Parser[bytes] = verify(take_n(1), lambda c: c in b'0123456789')
-digit_nz: Parser[str] = verify(take_n(1), lambda c: c in '123456789')
+digit_nz: Parser[bytes] = verify(take_n(1), lambda c: c in b'123456789')
-dquote: Parser[str] = tag('"')
+dquote: Parser[bytes] = tag(b'"')
-list_wildcards: Parser[str] = alt(tag('%'), tag('*'))
+list_wildcards: Parser[bytes] = alt(tag(b'%'), tag(b'*'))
-nil: Parser[None] = map_parser(itag('NIL'), lambda _: None)
+nil: Parser[None] = map_parser(itag(b'NIL'), lambda _: None)
-resp_specials: Parser[str] = tag(']')
+resp_specials: Parser[bytes] = tag(b']')
-text_char: Parser[str] = verify(take_n(1), lambda c: c not in '\r\n')
+text_char: Parser[bytes] = verify(take_n(1), lambda c: c not in b'\r\n')
# level 1
-date_day_fixed: Parser[str] = alt(preceded(tag(' '), digit), string_concat(many_m_n(digit, 2, 2)))
+date_day_fixed: Parser[bytes] = alt(preceded(tag(b' '), digit), string_concat(many_m_n(digit, 2, 2)))
-date_month: Parser[str] = alt(*(itag(x) for x in 'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()))
+date_month: Parser[bytes] = alt(*(itag(x) for x in b'Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()))
-date_year: Parser[str] = string_concat(many_m_n(digit, 4, 4))
+date_year: Parser[bytes] = string_concat(many_m_n(digit, 4, 4))
-number: Parser[str] = take_while1(as_predicate(digit))
+number: Parser[bytes] = take_while1(as_predicate(digit))
-nz_number: Parser[str] = map_parser(pair(digit_nz, take_while0(as_predicate(digit))), ''.join)
+nz_number: Parser[bytes] = string_concat(pair(digit_nz, take_while0(as_predicate(digit))))
-quoted_specials: Parser[str] = alt(dquote, tag('\\'))
+quoted_specials: Parser[bytes] = alt(dquote, tag(b'\\'))
-time: Parser[str] = map_parser(separated_triple(
+time: Parser[bytes] = map_parser(separated_triple(
string_concat(many_m_n(digit, 2, 2)),
- tag(':'),
+ tag(b':'),
string_concat(many_m_n(digit, 2, 2)),
- tag(':'),
+ tag(b':'),
string_concat(many_m_n(digit, 2, 2))
-), ':'.join)
+), b':'.join)
-zone: Parser[str] = string_concat(pair(alt(tag('+'), tag('-')), string_concat(many_m_n(digit, 4, 4))))
+zone: Parser[bytes] = string_concat(pair(alt(tag(b'+'), tag(b'-')), string_concat(many_m_n(digit, 4, 4))))
# level 2
-atom_specials: Parser[str] = alt(verify(take_n(1), lambda c: c in '(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
+atom_specials: Parser[bytes] = alt(verify(take_n(1), lambda c: c in b'(){ '), ctl, list_wildcards, quoted_specials, resp_specials)
-date_time: Parser[str] = delimited(
+date_time: Parser[bytes] = delimited(
dquote,
map_parser(separated_triple(
- map_parser(separated_triple(date_day_fixed, tag('-'), date_month, tag('-'), date_year), '-'.join),
- tag(' '),
+ map_parser(separated_triple(date_day_fixed, tag(b'-'), date_month, tag(b'-'), date_year), b'-'.join),
+ tag(b' '),
time,
- tag(' '),
+ tag(b' '),
zone
- ), ' '.join),
+ ), b' '.join),
dquote
)
-literal: Parser[str] = and_then(
- delimited(tag('{'), number, tag('}\r\n')),
- lambda n: preceded(delimited(tag('{'), number, tag('}\r\n')), take_n(int(n)))
+literal: Parser[bytes] = and_then(
+ delimited(tag(b'{'), number, tag(b'}\r\n')),
+ lambda n: preceded(delimited(tag(b'{'), number, tag(b'}\r\n')), take_n(int(n)))
)
-quoted_char: Parser[str] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag('\\'), quoted_specials))
+quoted_char: Parser[bytes] = alt(verify(text_char, lambda c: quoted_specials(c) is None), preceded(tag(b'\\'), quoted_specials))
-section_part: Parser[str] = map_parser(separated_many1(nz_number, tag('.')), '.'.join)
+section_part: Parser[bytes] = map_parser(separated_many1(nz_number, tag(b'.')), b'.'.join)
# level 3
-atom_char: Parser[str] = verify(take_n(1), lambda c: atom_specials(c) is None)
+atom_char: Parser[bytes] = verify(take_n(1), lambda c: atom_specials(c) is None)
-quoted: Parser[str] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote)
+quoted: Parser[bytes] = delimited(dquote, take_while0(as_predicate(quoted_char)), dquote)
# level 4
-astring_char: Parser[str] = alt(atom_char, resp_specials)
+astring_char: Parser[bytes] = alt(atom_char, resp_specials)
-atom: Parser[str] = take_while1(as_predicate(atom_char))
+atom: Parser[bytes] = take_while1(as_predicate(atom_char))
-string: Parser[str] = alt(quoted, literal)
+string: Parser[bytes] = alt(quoted, literal)
# level 5
-astring: Parser[str] = alt(take_while1(as_predicate(astring_char)), string)
+astring: Parser[bytes] = alt(take_while1(as_predicate(astring_char)), string)
-flag_keyword: Parser[str] = atom
+flag_keyword: Parser[bytes] = atom
-flag_extension: Parser[str] = map_parser(pair(tag('\\'), atom), ''.join)
+flag_extension: Parser[bytes] = string_concat(pair(tag(b'\\'), atom))
-nstring: Parser[Optional[str]] = alt(string, nil)
+nstring: Parser[Optional[bytes]] = alt(string, nil)
# level 6
-addr_adl: Parser[Optional[str]] = nstring
-addr_host: Parser[Optional[str]] = nstring
-addr_mailbox: Parser[Optional[str]] = nstring
-addr_name: Parser[Optional[str]] = nstring
-
-env_date: Parser[Optional[str]] = nstring
-env_in_reply_to: Parser[Optional[str]] = nstring
-env_message_id: Parser[Optional[str]] = nstring
-env_subject: Parser[Optional[str]] = nstring
-
-flag: Parser[str] = alt(
- *(itag('\\' + x) for x in ['Answered', 'Flagged', 'Deleted', 'Seen', 'Draft']),
+addr_adl: Parser[Optional[bytes]] = nstring
+addr_host: Parser[Optional[bytes]] = nstring
+addr_mailbox: Parser[Optional[bytes]] = nstring
+addr_name: Parser[Optional[bytes]] = nstring
+
+env_date: Parser[Optional[bytes]] = nstring
+env_in_reply_to: Parser[Optional[bytes]] = nstring
+env_message_id: Parser[Optional[bytes]] = nstring
+env_subject: Parser[Optional[bytes]] = nstring
+
+flag: Parser[bytes] = alt(
+ *(itag(b'\\' + x) for x in [b'Answered', b'Flagged', b'Deleted', b'Seen', b'Draft']),
flag_keyword,
flag_extension
)
-header_fld_name: Parser[str] = astring
+header_fld_name: Parser[bytes] = astring
-mailbox: Parser[str] = alt(itag('INBOX'), astring)
+mailbox: Parser[bytes] = alt(itag(b'INBOX'), astring)
-mbx_list_flag: Parser[str] = alt(itag(r'\Noselect'), itag(r'\Marked'), itag(r'\Unmarked'), itag(r'\Noinferiors'), flag_extension)
+mbx_list_flag: Parser[bytes] = alt(itag(rb'\Noselect'), itag(rb'\Marked'), itag(rb'\Unmarked'), itag(rb'\Noinferiors'), flag_extension)
# level 7
-_ParsedAddress = Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]
-def _merge_for_address(t: Tuple[Tuple[Optional[str], Optional[str]], Tuple[Optional[str], Optional[str]]]) -> _ParsedAddress:
+_ParsedAddress = Tuple[Optional[bytes], Optional[bytes], Optional[bytes], Optional[bytes]]
+def _merge_for_address(t: Tuple[Tuple[Optional[bytes], Optional[bytes]], Tuple[Optional[bytes], Optional[bytes]]]) -> _ParsedAddress:
return (*t[0], *t[1])
-address: Parser[_ParsedAddress] = map_parser(delimited(tag('('), separated_pair(
- separated_pair(addr_name, tag(' '), addr_adl),
- tag(' '),
- separated_pair(addr_mailbox, tag(' '), addr_host)
-), tag(')')), _merge_for_address)
+address: Parser[_ParsedAddress] = map_parser(delimited(tag(b'('), separated_pair(
+ separated_pair(addr_name, tag(b' '), addr_adl),
+ tag(b' '),
+ separated_pair(addr_mailbox, tag(b' '), addr_host)
+), tag(b')')), _merge_for_address)
-flag_fetch: Parser[str] = alt(itag(r'\Recent'), flag)
+flag_fetch: Parser[bytes] = alt(itag(rb'\Recent'), flag)
-header_list: Parser[ListT[str]] = delimited(tag('('), separated_many1(header_fld_name, tag(' ')), tag(')'))
+header_list: Parser[ListT[bytes]] = delimited(tag(b'('), separated_many1(header_fld_name, tag(b' ')), tag(b')'))
-mbx_list_flags: Parser[ListT[str]] = separated_many0(mbx_list_flag, tag(' '))
+mbx_list_flags: Parser[ListT[bytes]] = separated_many0(mbx_list_flag, tag(b' '))
# level 8
-_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag('('), many1(address), tag(')')), map_parser(nil, lambda _: list()))
+_address_list: Parser[ListT[_ParsedAddress]] = alt(delimited(tag(b'('), many1(address), tag(b')')), map_parser(nil, lambda _: list()))
env_bcc: Parser[ListT[_ParsedAddress]] = _address_list
env_cc: Parser[ListT[_ParsedAddress]] = _address_list
env_from: Parser[ListT[_ParsedAddress]] = _address_list
@@ -150,167 +150,161 @@ env_reply_to: Parser[ListT[_ParsedAddress]] = _address_list
env_sender: Parser[ListT[_ParsedAddress]] = _address_list
env_to: Parser[ListT[_ParsedAddress]] = _address_list
-mailbox_list: Parser[Tuple[ListT[str], Optional[str], str]] = separated_triple(
- delimited(tag('('), mbx_list_flags, tag(')')),
- tag(' '),
+mailbox_list: Parser[Tuple[ListT[bytes], Optional[bytes], bytes]] = separated_triple(
+ delimited(tag(b'('), mbx_list_flags, tag(b')')),
+ tag(b' '),
alt(delimited(dquote, quoted_char, dquote), nil),
- tag(' '),
+ tag(b' '),
mailbox,
)
-msg_att_dynamic: Parser[ListT[str]] = delimited(
- itag('FLAGS ('),
- separated_many0(flag_fetch, tag(' ')),
- tag(')')
+msg_att_dynamic: Parser[ListT[bytes]] = delimited(
+ itag(b'FLAGS ('),
+ separated_many0(flag_fetch, tag(b' ')),
+ tag(b')')
)
-section_msgtext: Parser[str] = alt(
+section_msgtext: Parser[bytes] = alt(
string_concat(separated_pair(
- map_parser(pair(itag('HEADER.FIELDS'), opt(itag('.NOT'))), _condense_non_none),
- tag(' '),
+ map_parser(pair(itag(b'HEADER.FIELDS'), opt(itag(b'.NOT'))), _condense_non_none),
+ tag(b' '),
string_concat(header_list)
)),
- itag('HEADER'),
- itag('TEXT')
+ itag(b'HEADER'),
+ itag(b'TEXT')
)
# level 9
-_Envelope = Dict[str, Union[Optional[str], ListT[_ParsedAddress]]]
-def _label_nstring(name: str) -> Callable[[Optional[str]], Tuple[str, Optional[str]]]:
- def give_label(x: Optional[str]) -> Tuple[str, Optional[str]]:
+_Envelope = Dict[bytes, Union[Optional[bytes], ListT[_ParsedAddress]]]
+def _label_nstring(name: bytes) -> Callable[[Optional[bytes]], Tuple[bytes, Optional[bytes]]]:
+ def give_label(x: Optional[bytes]) -> Tuple[bytes, Optional[bytes]]:
return name, x
return give_label
-def _label_address_list(name: str) -> Callable[[ListT[_ParsedAddress]], Tuple[str, ListT[_ParsedAddress]]]:
- def give_label(x: ListT[_ParsedAddress]) -> Tuple[str, ListT[_ParsedAddress]]:
+def _label_address_list(name: bytes) -> Callable[[ListT[_ParsedAddress]], Tuple[bytes, ListT[_ParsedAddress]]]:
+ def give_label(x: ListT[_ParsedAddress]) -> Tuple[bytes, ListT[_ParsedAddress]]:
return name, x
return give_label
def _flatten_envelope(data: Tuple) -> _Envelope:
def do_flatten(x: Tuple):
- if isinstance(x[0], str):
+ if isinstance(x[0], bytes):
yield x
else:
for datum in x:
yield from do_flatten(datum)
return dict(do_flatten(data))
envelope: Parser[_Envelope] = map_parser(delimited(
- tag('('),
+ tag(b'('),
separated_triple(
separated_triple(
- map_parser(env_date, _label_nstring('date')),
- tag(' '),
- map_parser(env_subject, _label_nstring('subject')),
- tag(' '),
- map_parser(env_from, _label_address_list('from'))
+ map_parser(env_date, _label_nstring(b'date')),
+ tag(b' '),
+ map_parser(env_subject, _label_nstring(b'subject')),
+ tag(b' '),
+ map_parser(env_from, _label_address_list(b'from'))
),
- tag(' '),
+ tag(b' '),
separated_triple(
- map_parser(env_sender, _label_address_list('sender')),
- tag(' '),
- map_parser(env_reply_to, _label_address_list('reply_to')),
- tag(' '),
- map_parser(env_to, _label_address_list('to'))
+ map_parser(env_sender, _label_address_list(b'sender')),
+ tag(b' '),
+ map_parser(env_reply_to, _label_address_list(b'reply_to')),
+ tag(b' '),
+ map_parser(env_to, _label_address_list(b'to'))
),
- tag(' '),
+ tag(b' '),
separated_pair(
separated_pair(
- map_parser(env_cc, _label_address_list('cc')),
- tag(' '),
- map_parser(env_bcc, _label_address_list('bcc'))
+ map_parser(env_cc, _label_address_list(b'cc')),
+ tag(b' '),
+ map_parser(env_bcc, _label_address_list(b'bcc'))
),
- tag(' '),
+ tag(b' '),
separated_pair(
- map_parser(env_in_reply_to, _label_nstring('in_reply_to')),
- tag(' '),
- map_parser(env_message_id, _label_nstring('message_id'))
+ map_parser(env_in_reply_to, _label_nstring(b'in_reply_to')),
+ tag(b' '),
+ map_parser(env_message_id, _label_nstring(b'message_id'))
)
)
),
- tag(')')
+ tag(b')')
), _flatten_envelope)
-section_text: Parser[str] = alt(section_msgtext, itag('MIME'))
+section_text: Parser[bytes] = alt(section_msgtext, itag(b'MIME'))
# level 10
-section_spec: Parser[str] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag('.'), section_text))), _condense_non_none))
+section_spec: Parser[bytes] = alt(section_msgtext, map_parser(pair(section_part, opt(preceded(tag(b'.'), section_text))), _condense_non_none))
# level 11
-section: Parser[str] = map_parser(triple(tag('['), opt(section_spec), tag(']')), _condense_non_none)
+section: Parser[bytes] = map_parser(triple(tag(b'['), opt(section_spec), tag(b']')), _condense_non_none)
# level 12
-msg_att_static: Parser[Tuple[str, Union[Optional[str], _Envelope]]] = alt(
- separated_pair(itag('ENVELOPE'), tag(' '), envelope),
- separated_pair(itag('INTERNALDATE'), tag(' '), date_time),
+msg_att_static: Parser[Tuple[bytes, Union[Optional[bytes], _Envelope]]] = alt(
+ separated_pair(itag(b'ENVELOPE'), tag(b' '), envelope),
+ separated_pair(itag(b'INTERNALDATE'), tag(b' '), date_time),
separated_pair(
map_parser(
- pair(itag('RFC822'), opt(alt(itag('.HEADER'), itag('.TEXT')))),
+ pair(itag(b'RFC822'), opt(alt(itag(b'.HEADER'), itag(b'.TEXT')))),
_condense_non_none
),
- tag(' '),
+ tag(b' '),
nstring
),
- separated_pair(itag('RFC822.SIZE'), tag(' '), number),
+ separated_pair(itag(b'RFC822.SIZE'), tag(b' '), number),
# TODO BODY
separated_pair(
map_parser(
- triple(itag('BODY'), section, opt(delimited(tag('<'), number, tag('>')))),
+ triple(itag(b'BODY'), section, opt(delimited(tag(b'<'), number, tag(b'>')))),
_condense_non_none
),
- tag(' '),
+ tag(b' '),
nstring
)
# TODO UID
)
# level 13
-msg_att: Parser[ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]] = delimited(
- tag('('),
- separated_many1(alt(map_parser(msg_att_dynamic, lambda x: ('FLAGS', x)), msg_att_static), tag(' ')),
- tag(')')
+msg_att: Parser[ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]] = delimited(
+ tag(b'('),
+ separated_many1(alt(map_parser(msg_att_dynamic, lambda x: (b'FLAGS', x)), msg_att_static), tag(b' ')),
+ tag(b')')
)
# level 14
-message_data: Parser[Tuple[int, ListT[Tuple[str, Union[str, None, ListT[str], _Envelope]]]]] = separated_pair(
+message_data: Parser[Tuple[int, ListT[Tuple[bytes, Union[bytes, None, ListT[bytes], _Envelope]]]]] = separated_pair(
map_parser(nz_number, int),
- tag(' '), # TODO what does imaplib do with expunge replies
+ tag(b' '), # TODO what does imaplib do with expunge replies
msg_att
)
@dataclass
class List:
- attributes: ListT[str]
- delimiter: Optional[str]
- name: str
+ attributes: ListT[bytes]
+ delimiter: Optional[bytes]
+ name: bytes
@staticmethod
- def parse(response_bytes: bytes) -> 'List':
- response = response_bytes.decode('ASCII')
+ def parse(response: bytes) -> 'List':
parser = all_consuming(mailbox_list)
parse_result = parser(response)
if parse_result is None:
- raise ValueError(f'invalid List.parse argument {repr(response)}')
+ raise ValueError(f'invalid List.parse argument:\n{response!r}')
(attributes, delimiter, name), _ = parse_result
return List(attributes, delimiter, name)
@dataclass
class MessageData:
number: int
- flags: Optional[ListT[str]]
- internal_date: Optional[str]
- size: Optional[str]
+ flags: Optional[ListT[bytes]]
+ internal_date: Optional[bytes]
+ size: Optional[bytes]
envelope: Optional[_Envelope]
- body_all_sections: Optional[str]
+ body_all_sections: Optional[bytes]
@staticmethod
- def parse(response_bytes: bytes) -> 'MessageData':
- try:
- response = response_bytes.decode('ASCII')
- except UnicodeError:
- print(f'unicode fucky wucky occurred:\n{response_bytes}')
- response = response_bytes.decode('ASCII', errors='replace')
+ def parse(response: bytes) -> 'MessageData':
parser = all_consuming(message_data)
parse_result = parser(response)
if parse_result is None:
- raise ValueError(f'invalid MessageData.parse argument:\n{response}')
+ raise ValueError(f'invalid MessageData.parse argument:\n{response!r}')
(msg_number, data), _ = parse_result
flags = None
@@ -319,15 +313,15 @@ class MessageData:
env = None
body_all_sections = None
for name, value in data:
- if name == 'FLAGS' and isinstance(value, list):
+ if name == b'FLAGS' and isinstance(value, list):
flags = value
- elif name == 'INTERNALDATE' and isinstance(value, str):
+ elif name == b'INTERNALDATE' and isinstance(value, bytes):
internal_date = value
- elif name == 'RFC822.SIZE' and isinstance(value, str):
+ elif name == b'RFC822.SIZE' and isinstance(value, bytes):
size = value
- elif name == 'ENVELOPE' and isinstance(value, dict):
+ elif name == b'ENVELOPE' and isinstance(value, dict):
env = value
- elif name == 'BODY[]' and isinstance(value, str):
+ elif name == b'BODY[]' and isinstance(value, bytes):
body_all_sections = value
else:
print('warning: ignoring unknown data name', repr(name))
diff --git a/ctec/logic.py b/ctec/logic.py
index d151a0f..d2767bc 100644
--- a/ctec/logic.py
+++ b/ctec/logic.py
@@ -9,19 +9,20 @@ mailbox.Maildir.colon = '!'
MESSAGE_DOWNLOAD_BATCH = 1
-def percent_encode(c: str) -> str:
- return '%' + hex(ord(c))[2:]
+def percent_encode(c: bytes) -> bytes:
+ return b'%' + c.hex().encode()
-def clean_folder_name(folder_name: str, separator: str) -> str:
- folder_name = folder_name.replace('%', percent_encode('%'))
- folder_name = folder_name.replace('.', percent_encode('.'))
- folder_name = folder_name.replace(separator, '.')
- return folder_name
+def clean_folder_name(folder_name: bytes, separator: bytes) -> str:
+ folder_name = folder_name.replace(b'%', percent_encode(b'%'))
+ folder_name = folder_name.replace(b'.', percent_encode(b'.'))
+ folder_name = folder_name.replace(separator, b'.')
+ return folder_name.decode()
-def dirty_folder_name(folder_name: str, separator: str = '/') -> str:
- folder_name = folder_name.replace('.', separator)
- folder_name = folder_name.replace(percent_encode('.'), '.')
- folder_name = folder_name.replace(percent_encode('%'), '%')
+def dirty_folder_name(folder_name_str: str, separator: bytes = b'/') -> bytes:
+ folder_name = folder_name_str.encode()
+ folder_name = folder_name.replace(b'.', separator)
+ folder_name = folder_name.replace(percent_encode(b'.'), b'.')
+ folder_name = folder_name.replace(percent_encode(b'%'), b'%')
return folder_name
class Account:
@@ -42,13 +43,13 @@ class Account:
typ, folder_list = self.connection.list()
for folder in folder_list:
folder_info = imap_response.List.parse(folder)
- if folder_info.delimiter != '/':
+ if folder_info.delimiter != b'/':
raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter')
self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter))
def fetch_folder(self, folder_name: str):
folder = self.mailbox.add_folder(folder_name)
- message_id_to_key = dict((message['Message-ID'], key) for key, message in folder.iteritems() if 'Message-ID' in message)
+ message_id_to_key = dict((message['Message-ID'].encode(), key) for key, message in folder.iteritems() if 'Message-ID' in message)
typ, count_data = self.connection.select(dirty_folder_name(folder_name))
if typ != 'OK':
print(typ, count_data)
@@ -72,22 +73,22 @@ class Account:
if parsed_data.envelope is None:
print('hold the fuck up where did that envelope go', parsed_data)
else:
- message_id = parsed_data.envelope.get('message_id', None)
+ message_id = parsed_data.envelope.get(b'message_id', None)
if message_id is None:
print('no message ID for', parsed_data)
elif message_id in message_id_to_key:
message: mailbox.MaildirMessage = folder[message_id_to_key[message_id]]
flags_list = parsed_data.flags or []
flags = ''
- if r'\Seen' in flags_list:
+ if rb'\Seen' in flags_list:
flags += 'S'
- if r'\Answered' in flags_list:
+ if rb'\Answered' in flags_list:
flags += 'R' # Replied to
- if r'\Flagged' in flags_list:
+ if rb'\Flagged' in flags_list:
flags += 'F'
- if r'\Deleted' in flags_list:
+ if rb'\Deleted' in flags_list:
flags += 'T' # Trashed
- if r'\Draft' in flags_list:
+ if rb'\Draft' in flags_list:
flags += 'D'
message.set_flags(flags)
else:
@@ -119,15 +120,15 @@ class Account:
message = mailbox.MaildirMessage(parsed_data.body_all_sections)
flags_list = parsed_data.flags or []
flags = ''
- if r'\Seen' in flags_list:
+ if rb'\Seen' in flags_list:
flags += 'S'
- if r'\Answered' in flags_list:
+ if rb'\Answered' in flags_list:
flags += 'R' # Replied to
- if r'\Flagged' in flags_list:
+ if rb'\Flagged' in flags_list:
flags += 'F'
- if r'\Deleted' in flags_list:
+ if rb'\Deleted' in flags_list:
flags += 'T' # Trashed
- if r'\Draft' in flags_list:
+ if rb'\Draft' in flags_list:
flags += 'D'
message.set_flags(flags)
folder.add(message)
diff --git a/ctec/parse_utils.py b/ctec/parse_utils.py
index 890d36f..7ba2ccf 100644
--- a/ctec/parse_utils.py
+++ b/ctec/parse_utils.py
@@ -37,16 +37,16 @@ T1 = TypeVar('T1')
T2 = TypeVar('T2')
T3 = TypeVar('T3')
-ParseResult = Optional[Tuple[T, str]]
-Parser = Callable[[str], ParseResult[T]]
+ParseResult = Optional[Tuple[T, bytes]]
+Parser = Callable[[bytes], ParseResult[T]]
-def as_predicate(parser: Parser[T]) -> Callable[[str], bool]:
- def check(text: str) -> bool:
- return parser(text) is not None
+def as_predicate(parser: Parser[T]) -> Callable[[int], bool]:
+ def check(text: int) -> bool:
+ return parser(bytes([text])) is not None
return check
def alt(*parsers: Parser[T]) -> Parser[T]:
- def parse(text: str) -> ParseResult[T]:
+ def parse(text: bytes) -> ParseResult[T]:
for parser in parsers[:-1]:
result = parser(text)
if result is not None:
@@ -54,56 +54,58 @@ def alt(*parsers: Parser[T]) -> Parser[T]:
return parsers[-1](text)
return parse
-def tag(tag_text: str) -> Parser[str]:
- def parse(text: str) -> ParseResult[str]:
+def tag(tag_text: bytes) -> Parser[bytes]:
+ def parse(text: bytes) -> ParseResult[bytes]:
if text.startswith(tag_text):
return tag_text, text[len(tag_text):]
return None
return parse
# case-insensitive tag
-def itag(tag_text: str) -> Parser[str]:
- def parse(text: str) -> ParseResult[str]:
- if text.casefold().startswith(tag_text.casefold()):
+def itag(tag_text: bytes) -> Parser[bytes]:
+ def parse(text: bytes) -> ParseResult[bytes]:
+ tag_str = tag_text.decode()
+ text_str = text.decode()
+ if text_str.casefold().startswith(tag_str.casefold()):
return tag_text, text[len(tag_text):]
return None
return parse
-def take_while0(predicate: Callable[[str], bool]) -> Parser[str]:
- def parse(text: str) -> ParseResult[str]:
+def take_while0(predicate: Callable[[int], bool]) -> Parser[bytes]:
+ def parse(text: bytes) -> ParseResult[bytes]:
for i in range(len(text)):
if not predicate(text[i]):
return text[:i], text[i:]
- return text, ""
+ return text, b""
return parse
-def take_while1(predicate: Callable[[str], bool]) -> Parser[str]:
- def parse(text: str) -> ParseResult[str]:
+def take_while1(predicate: Callable[[int], bool]) -> Parser[bytes]:
+ def parse(text: bytes) -> ParseResult[bytes]:
if len(text) == 0 or not predicate(text[0]):
return None
for i in range(1, len(text)):
if not predicate(text[i]):
return text[:i], text[i:]
- return text, ""
+ return text, b""
return parse
-def take_till1(predicate: Callable[[str], bool]) -> Parser[str]:
+def take_till1(predicate: Callable[[int], bool]) -> Parser[bytes]:
return take_while1(lambda x: not predicate(x))
-def take_n(n: int) -> Parser[str]:
- def parse(text: str) -> ParseResult[str]:
+def take_n(n: int) -> Parser[bytes]:
+ def parse(text: bytes) -> ParseResult[bytes]:
if len(text) < n:
return None
return text[:n], text[n:]
return parse
-def any_char(text: str) -> ParseResult[str]:
+def any_char(text: bytes) -> ParseResult[bytes]:
if len(text) > 0:
- return text[0], text[1:]
+ return text[0:1], text[1:]
return None
def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]:
- def parse(text: str) -> ParseResult[T]:
+ def parse(text: bytes) -> ParseResult[T]:
parsed_result = parser(text)
if parsed_result is None:
if debug:
@@ -114,11 +116,11 @@ def all_consuming(parser: Parser[T], *, debug=False) -> Parser[T]:
if debug:
print('all_consuming: leftover text {}', repr(extra))
return None
- return result, ''
+ return result, b''
return parse
def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]:
- def parse(text: str) -> ParseResult[T2]:
+ def parse(text: bytes) -> ParseResult[T2]:
parsed_result = parser(text)
if parsed_result is None:
return None
@@ -127,7 +129,7 @@ def map_parser(parser: Parser[T1], mapper: Callable[[T1], T2]) -> Parser[T2]:
return parse
def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[T2]]) -> Parser[T2]:
- def parse(text: str) -> ParseResult[T2]:
+ def parse(text: bytes) -> ParseResult[T2]:
parsed_result = first_parser(text)
if parsed_result is None:
return None
@@ -136,7 +138,7 @@ def and_then(first_parser: Parser[T1], get_second_parser: Callable[[T1], Parser[
return parse
def opt(parser: Parser[T]) -> Parser[Optional[T]]:
- def parse(text: str) -> ParseResult[Optional[T]]:
+ def parse(text: bytes) -> ParseResult[Optional[T]]:
result = parser(text)
if result is None:
return None, text
@@ -144,7 +146,7 @@ def opt(parser: Parser[T]) -> Parser[Optional[T]]:
return parse
def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]:
- def parse(text: str) -> ParseResult[T]:
+ def parse(text: bytes) -> ParseResult[T]:
parsed_result = parser(text)
if parsed_result is None:
return None
@@ -155,7 +157,7 @@ def verify(parser: Parser[T], predicate: Callable[[T], bool]) -> Parser[T]:
return parse
def many0(parser: Parser[T]) -> Parser[List[T]]:
- def parse(text: str) -> ParseResult[List[T]]:
+ def parse(text: bytes) -> ParseResult[List[T]]:
result = []
parser_result = parser(text)
while parser_result is not None:
@@ -166,7 +168,7 @@ def many0(parser: Parser[T]) -> Parser[List[T]]:
return parse
def many1(parser: Parser[T]) -> Parser[List[T]]:
- def parse(text: str) -> ParseResult[List[T]]:
+ def parse(text: bytes) -> ParseResult[List[T]]:
parser_result = parser(text)
if parser_result is None:
return None
@@ -182,7 +184,7 @@ def many1(parser: Parser[T]) -> Parser[List[T]]:
return parse
def many_m_n(parser: Parser[T], min_inclusive: int, max_inclusive: int) -> Parser[List[T]]:
- def parse(text: str) -> ParseResult[List[T]]:
+ def parse(text: bytes) -> ParseResult[List[T]]:
result: List[T] = []
while len(result) < min_inclusive:
parser_result = parser(text)
@@ -201,7 +203,7 @@ def many_m_n(parser: Parser[T], min_inclusive: int, max_inclusive: int) -> Parse
return parse
def separated_many0(parser: Parser[T], separator_parser: Parser) -> Parser[List[T]]:
- def parse(text: str) -> ParseResult[List[T]]:
+ def parse(text: bytes) -> ParseResult[List[T]]:
result = []
while True:
parser_result = parser(text)
@@ -221,7 +223,7 @@ def separated_many1(parser: Parser[T], separator_parser: Parser) -> Parser[List[
return verify(separated_many0(parser, separator_parser), lambda result: len(result) > 0)
def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser[T2]) -> Parser[T]:
- def parse(text: str) -> ParseResult[T]:
+ def parse(text: bytes) -> ParseResult[T]:
before_result = before_parser(text)
if before_result is None:
return None
@@ -241,7 +243,7 @@ def delimited(before_parser: Parser[T1], parser: Parser[T], after_parser: Parser
return parse
def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:
- def parse(text: str) -> ParseResult[Tuple[T1, T2]]:
+ def parse(text: bytes) -> ParseResult[Tuple[T1, T2]]:
first_parsed_result = first_parser(text)
if first_parsed_result is None:
return None
@@ -256,7 +258,7 @@ def pair(first_parser: Parser[T1], second_parser: Parser[T2]) -> Parser[Tuple[T1
return parse
def triple(first_parser: Parser[T1], second_parser: Parser[T2], third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]:
- def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]:
+ def parse(text: bytes) -> ParseResult[Tuple[T1, T2, T3]]:
first_parsed_result = first_parser(text)
if first_parsed_result is None:
return None
@@ -286,7 +288,7 @@ def followed(parser: Parser[T], after_parser: Parser[T1]) -> Parser[T]:
return map_parser(pair(parser, after_parser), first)
def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_parser: Parser[T2]) -> Parser[Tuple[T1, T2]]:
- def parse(text: str) -> ParseResult[Tuple[T1, T2]]:
+ def parse(text: bytes) -> ParseResult[Tuple[T1, T2]]:
first_parsed_result = first_parser(text)
if first_parsed_result is None:
return None
@@ -306,7 +308,7 @@ def separated_pair(first_parser: Parser[T1], between_parser: Parser[T], second_p
return parse
def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_parser: Parser[T2], between23_parser: Parser, third_parser: Parser[T3]) -> Parser[Tuple[T1, T2, T3]]:
- def parse(text: str) -> ParseResult[Tuple[T1, T2, T3]]:
+ def parse(text: bytes) -> ParseResult[Tuple[T1, T2, T3]]:
first_parsed_result = first_parser(text)
if first_parsed_result is None:
return None
@@ -335,5 +337,5 @@ def separated_triple(first_parser: Parser[T1], between12_parser: Parser, second_
return (first_result, second_result, third_result), extra
return parse
-def string_concat(parser: Parser[Sequence[str]]) -> Parser[str]:
- return map_parser(parser, ''.join)
+def string_concat(parser: Parser[Sequence[bytes]]) -> Parser[bytes]:
+ return map_parser(parser, b''.join)