import imaplib import mailbox import pathlib from typing import List, Tuple from . import import_or_install, imap_response mailbox.Maildir.colon = '!' MESSAGE_DOWNLOAD_BATCH = 1 def percent_encode(c: bytes) -> bytes: return b'%' + c.hex().encode() def clean_folder_name(folder_name: bytes, separator: bytes) -> str: folder_name = folder_name.replace(b'%', percent_encode(b'%')) folder_name = folder_name.replace(b'.', percent_encode(b'.')) folder_name = folder_name.replace(separator, b'.') return folder_name.decode() def dirty_folder_name(folder_name_str: str, separator: bytes = b'/') -> bytes: folder_name = folder_name_str.encode() folder_name = folder_name.replace(b'.', separator) folder_name = folder_name.replace(percent_encode(b'.'), b'.') folder_name = folder_name.replace(percent_encode(b'%'), b'%') return folder_name class Account: def __init__(self, address: str, info: dict): appdirs = import_or_install('appdirs') data_dir = pathlib.Path(appdirs.user_data_dir(appname='ctec', appauthor=False)) self.address = address self.mailbox = mailbox.Maildir(data_dir / address) self.info = info self.connection = imaplib.IMAP4_SSL(self.info['imap host']) self.connection.login(self.address, self.info['password']) def __del__(self): self.connection.logout() def fetch_folders(self): folder_list: List[bytes] typ, folder_list = self.connection.list() for folder in folder_list: folder_info = imap_response.List.parse(folder) if folder_info.delimiter != b'/': raise NotImplementedError(f'who the hell uses {repr(folder_info.delimiter)} as a delimiter') self.mailbox.add_folder(clean_folder_name(folder_info.name, folder_info.delimiter)) def fetch_folder(self, folder_name: str): folder = self.mailbox.add_folder(folder_name) message_id_to_key = dict((message['Message-ID'].encode(), key) for key, message in folder.iteritems() if 'Message-ID' in message) typ, count_data = self.connection.select(dirty_folder_name(folder_name)) if typ != 'OK': print(typ, count_data) count = int(count_data[0].decode('ASCII')) needs_fetching = [] if count > 0: typ, data = self.connection.fetch('1:*', '(FLAGS ENVELOPE)') if typ != 'OK': print(typ, data) pending: List[bytes] = [] for fetched_line in data: if isinstance(fetched_line, tuple): pending.append(b'\r\n'.join(fetched_line)) elif isinstance(fetched_line, bytes): pending.append(fetched_line) else: raise TypeError(f'what the hell is a {repr(fetched_line)} and why is it in the fetch response') try: parsed_data = imap_response.MessageData.parse(b''.join(pending)) number = parsed_data.number if parsed_data.envelope is None: print('hold the fuck up where did that envelope go', parsed_data) else: message_id = parsed_data.envelope.get(b'message_id', None) if message_id is None: print('no message ID for', parsed_data) elif message_id in message_id_to_key: message: mailbox.MaildirMessage = folder[message_id_to_key[message_id]] flags_list = parsed_data.flags or [] flags = '' if rb'\Seen' in flags_list: flags += 'S' if rb'\Answered' in flags_list: flags += 'R' # Replied to if rb'\Flagged' in flags_list: flags += 'F' if rb'\Deleted' in flags_list: flags += 'T' # Trashed if rb'\Draft' in flags_list: flags += 'D' message.set_flags(flags) else: needs_fetching.append(number) pending = [] except ValueError: continue if len(pending) > 0: imap_response.MessageData.parse(b''.join(pending)) if len(needs_fetching) > 0: for fetch_start in range(0, len(needs_fetching), MESSAGE_DOWNLOAD_BATCH): typ, data = self.connection.fetch(','.join(str(x) for x in needs_fetching[fetch_start:fetch_start + MESSAGE_DOWNLOAD_BATCH]), '(FLAGS BODY.PEEK[])') if typ != 'OK': print(typ, data) pending = [] for fetched_line in data: if isinstance(fetched_line, tuple): pending.append(b'\r\n'.join(fetched_line)) elif isinstance(fetched_line, bytes): pending.append(fetched_line) else: raise TypeError(f'what the hell is a {repr(fetched_line)} and why is it in the fetch response') try: parsed_data = imap_response.MessageData.parse(b''.join(pending)) if parsed_data.body_all_sections is None: print('hold the fuck up where did that body go', parsed_data) else: message = mailbox.MaildirMessage(parsed_data.body_all_sections) flags_list = parsed_data.flags or [] flags = '' if rb'\Seen' in flags_list: flags += 'S' if rb'\Answered' in flags_list: flags += 'R' # Replied to if rb'\Flagged' in flags_list: flags += 'F' if rb'\Deleted' in flags_list: flags += 'T' # Trashed if rb'\Draft' in flags_list: flags += 'D' message.set_flags(flags) folder.add(message) pending = [] except ValueError: continue if len(pending) > 0: imap_response.MessageData.parse(b''.join(pending)) self.connection.close() def folders(self) -> List[Tuple[str, mailbox.Maildir]]: return [(folder, self.mailbox.get_folder(folder)) for folder in self.mailbox.list_folders()] def inbox(self) -> mailbox.Maildir: return self.mailbox.add_folder('Inbox')