#!/usr/bin/env python version = (0, 4) """ CHANGELOG 0.4 06-Jun-2007 h@realh.co.uk --- * Fixed preservation of status for IMAP and also implemented for mbox and Maildir 0.3 06-Jun-2007 h@realh.co.uk --- * Hopefully fix attempt to do what 0.2 was supposed to do * Better handling of when unable to parse a date header 0.2 23-Apr-2007 h@realh.co.uk --- * Preserve \Seen and \Flagged status when copying between IMAP folders. """ import email import email.FeedParser import email.Utils import errno import exceptions import fcntl import getpass import imaplib import mailbox import optparse import os import re import socket import stat import sys import time import traceback options = None def _(s): return s PLACEHOLDER_PATTERN = _('^From: Mail System Internal Data dots[1]: sys.stdout.write('.') sys.stdout.flush() dots[1] += 1 def end_dots(): "Simply prints newline if dots are enabled." if not options.quiet and not options.nodots: print def file_size(fp): pos = fp.tell() fp.seek(0, 2) # SEEK_END len = fp.tell() fp.seek(pos) return len def get_message_timestamp(message): def read_date_header(message, header): stamp = message.get(header) if not stamp: if header == 'Date': print >>sys.stderr, _("Message missing Date header") return None try: parsed = email.Utils.parsedate_tz(stamp) if not parsed: print >>sys.stderr, _("Unable to parse date %s") % stamp return None d = email.Utils.mktime_tz(parsed) except: print >>sys.stderr, _("Exception trying to parse date %s") % stamp d = None return d stamp = read_date_header(message, 'Delivery-date') if not stamp: stamp = read_date_header(message, 'Date') if not stamp: print >>sys.stderr, _("Unable to read date from message header") return None return stamp def header_from_fp(fp): parser = email.FeedParser.FeedParser() while True: line = fp.readline() if line: parser.feed(line) if not line.strip(): break else: break return parser.close() def strip_messageid(msgid): msgid = msgid.lstrip('<').rstrip('>') msgid = msgid.replace("\n", "").replace("\r", "") return msgid.replace("\t", "").replace(" ", "") class MailFolder: """ Base class for mail folder. """ def __init__(self, desc): "Give meaningful description string for mail folder." self.desc = desc self.messageids = {} self.anon_bodies = [] self.mid_dups = 0 self.anon_dups = 0 self.mid_accepted = 0 self.anon_accepted = 0 def get_named_header_at_index(self, index, header): "Gets just the content of the header." headers = self.get_entire_header_at_index(self, index) return headers.get(header) def scan_messageids(self, when, age): """ Builds a dict of Message-IDs (and list of entire messages which lack Message-IDs) for messages which are older than 'when' by no more than 'age' days. """ when -= age * 24 * 3600 self.scan() if not options.quiet: print _("Scanning '%s' for Message-IDs") % self.desc total = self.get_num_messages() msgids = 0 bodies = 0 for n in range(total): i = total - n - 1 header = self.get_entire_header_at_index(i) stamp = get_message_timestamp(header) if stamp and stamp < when: break msgid = header.get('Message-ID') if msgid: self.messageids[strip_messageid(msgid)] = True msgids += 1 else: self.anon_bodies.append(self.get_message_at_index(i)) bodies += 1 if not options.quiet: print _("Found %d Message-IDs and %d bodies which lacked IDs" \ " (%d+%d)") % (msgids, bodies, msgids, bodies) def is_placeholder(self, message): if options.skip: pattern = re.compile(options.placeholder) for i in message.items(): line = i[0] + ': ' + i[1] if pattern.search(line): return True return False def check_unique(self, message): """ Checks message's ID against IDs found by scan_messageids(). Returns True if unique or if no Message-ID. Doesn't check bodies so message may be header only. Also returns False for placeholders. """ if self.is_placeholder(message): return False msgid = message.get('Message-ID') if msgid: msgid = strip_messageid(msgid) dup = self.messageids.get(msgid, False) if dup: self.mid_dups += 1 return not dup else: return False def append_if_unique(self, message, status = None): """ Call this if check_unique has returned True but not sure whether it really is unique or lacked Message-ID. message must be entire. """ msgid = message.get('Message-ID') if msgid: self.mid_accepted += 1 else: dup = False for m in self.anon_bodies: if m.as_string() == message.as_string(): dup = True break if dup: self.anon_dups += 1 return else: self.anon_accepted += 1 self.append_message(message, status) """ Following methods should be overridden. """ def scan(self, force = False): """ Scans folder for messages if it hasn't already been scanned or if force is True. """ pass def get_num_messages(self): return 0 def get_entire_header_at_index(self, index): "Return a Message object." return None def get_message_at_index(self, index): "Return a Message object." return None def delete_message(self, index): pass def append_message(self, message, status = None): pass def close(self): pass def get_status(self, index, header = None): """ Returns a pair of bools: (read, flagged). header is an optional Message object to aid efficiency in case the header has to be fetched to determine the status anyway. """ return (False, False) class MailBoxFolder(MailFolder): def __init__(self, filename): MailFolder.__init__(self, filename) self.filename = filename self.fp = open(filename, 'r+') if fcntl.flock(self.fp, fcntl.LOCK_EX): self.fp.close() raise LockFailure(_("Unable to lock '%s'") % filename) self.deleted = [] self.offsets = None def scan(self, force = False): if self.offsets != None and not force: return l = file_size(self.fp) dots = init_dots(_("Scanning mailbox '%s'") % self.filename) self.offsets = [] prev_offset = 0 mbox = mailbox.PortableUnixMailbox(self.fp, self.offset_factory) for m in mbox: self.offsets.append(prev_offset) prev_offset = m if not options.nodots: dot(dots, self.fp.tell(), l) end_dots() if not options.quiet: print _("%d messages") % len(self.offsets) def offset_factory(self, fp): return self.fp.tell() def get_num_messages(self): self.scan() return len(self.offsets) def get_entire_header_at_index(self, index): self.scan() try: self.fp.seek(self.offsets[index]) return header_from_fp(self.fp) except: traceback.print_exc() return None def get_message_at_index(self, index): self.scan() try: self.fp.seek(self.offsets[index]) if index == len(self.offsets) - 1: return email.message_from_file(self.fp) else: end = self.offsets[index + 1] message = "" while self.fp.tell() < end: message += self.fp.readline() return email.message_from_string(message) except: traceback.print_exc() return None def delete_message(self, index): # Messages aren't deleted straight away; their offsets are stored # then the file is recreated with them cut out when it's closed. # Each deletion is stored as a pair of [start, end] offsets self.scan() if index == 0 and options.skip: header = self.get_entire_header_at_index(index) if self.is_placeholder(header): return if index == len(self.offsets) - 1: self.deleted.append([self.offsets[index], file_size(self.fp)]) else: self.deleted.append([self.offsets[index], self.offsets[index + 1]]) @staticmethod def formatdate(): " mutt doesn't recognise email.Utils.formatdate() in From separator. " return time.strftime("%a %b %d %H:%M:%S %Y %z") @staticmethod def __set_status(message, status): if status[0]: hdr = message.get('Status') if not hdr: hdr = '' if not 'R' in hdr: hdr += 'R' message['Status'] = hdr if status[1]: hdr = message.get('X-Status') if not hdr: hdr = '' if not 'F' in hdr: hdr += 'F' message['X-Status'] = hdr def append_message(self, message, status = None): self.fp.seek(0, 2) # SEEK_END if self.offsets != None: self.offsets.append(self.fp.tell()) if not message.get_unixfrom(): message.set_unixfrom('From %s@%s %s' % (getpass.getuser(), socket.getfqdn(), self.formatdate())) if status and (status[0] or status[1]): self.__set_status(message, status) self.fp.write(message.as_string(True)) def __copy_up_to(self, up_to, out_off, dots, total): # At exit in_off = up_to and new out_off is returned in_off = self.fp.tell() while in_off < up_to: l = up_to - in_off if l > 1000000: l = 1000000 s = self.fp.read(l) l = len(s) in_off += l self.fp.seek(out_off) self.fp.write(s) out_off += l dot(dots, out_off, total) self.fp.seek(in_off) return out_off def close(self): if self.deleted: l = file_size(self.fp) # Work out how many bytes will be written to_write = l ranges = [] for d in self.deleted: to_write -= d[1] - d[0] if len(ranges) and ranges[-1][1] == d[0]: ranges[-1][1] = d[1] else: ranges.append(d) dots = init_dots(_("Deleting expired messages from '%s'") \ % self.desc) out_off = 0 in_off = 0 while ranges: d_offs = ranges.pop(0) self.fp.seek(in_off) out_off = self.__copy_up_to(d_offs[0], out_off, dots, to_write) in_off = d_offs[1] if in_off < l: self.fp.seek(in_off) out_off = self.__copy_up_to(l, out_off, dots, to_write) self.fp.truncate(out_off) end_dots() self.fp.close() def get_status(self, index, header = None): self.scan() if not header: header = self.get_entire_header_at_index(index) status = header.get('Status') xstatus = header.get('X-Status') read = False flagged = False if (status and 'R' in status): read = True if (xstatus and 'F' in xstatus): flagged = True return (read, flagged) class MailDirFolder(MailFolder): deliveries = 0 def __init__(self, dirname): MailFolder.__init__(self, dirname) self.dirname = dirname self.names_and_times = None def pathname(self, *leafname): return os.path.join(self.dirname, *leafname) def __scan_dir(self, dir): full_dir = self.pathname(dir) names = os.listdir(full_dir) # Get mtimes, pruning non-files for n in names: s = os.stat(os.path.join(full_dir, n)) if stat.S_ISREG(s.st_mode): self.names_and_times.append([os.path.join(dir, n), s.st_mtime]) def scan(self, force = False): if self.names_and_times != None and not force: return if not options.quiet: print _("Scanning maildir folder '%s'") % self.dirname self.names_and_times = [] # Each entry of names_and_times is [leafname including cur/new, mtime] self.__scan_dir("cur") self.__scan_dir("new") # Sort files by mtime self.names_and_times.sort(key = lambda x: x[1]) if not options.quiet: print _("%d messages") % len(self.names_and_times) def get_num_messages(self): self.scan() return len(self.names_and_times) def filename_at_index(self, index): self.scan() return self.pathname(self.names_and_times[index][0]) def get_entire_header_at_index(self, index): self.scan() try: fp = open(self.filename_at_index(index)) message = header_from_fp(fp) fp.close() return message except: traceback.print_exc() try: fp.close() except: pass return None def get_message_at_index(self, index): self.scan() try: fp = open(self.filename_at_index(index)) message = email.message_from_file(fp) fp.close() return message except: traceback.print_exc() try: fp.close() except: pass return None def delete_message(self, index): self.scan() try: os.remove(self.filename_at_index(index)) except: traceback.print_exc() def append_message(self, message, status = None): os.chdir(self.dirname) self.__class__.deliveries += 1 for n in range(5): now = time.time() prefix = "%d.M%dP%d" % (int(now), int((now - int(now)) * 1000000), os.getpid()) tempname = os.path.join("tmp", prefix + "_%d.%s" % \ (self.__class__.deliveries, socket.gethostname())) try: os.stat(tempname) except exceptions.OSError, e: if e.errno == errno.ENOENT: break elif not options.quiet: traceback.print_exc() except: if not options.quiet: traceback.print_exc() time.sleep(2) else: raise MailDirError( \ _("Unable to create new message in maildir '%s'") \ % self.dirname) try: fp = open(tempname, 'w') fp.write(message.as_string()) fp.flush() os.fsync(fp.fileno()) stat = os.fstat(fp.fileno()) fp.close() prefix += "V%ldI%d_%d.%s,S=%d" % \ (stat.st_dev, stat.st_ino, self.__class__.deliveries, socket.gethostname(), stat.st_size) tempname2 = os.path.join("new", prefix) os.link(self.pathname(tempname), self.pathname(tempname2)) except exceptions.Exception, e: try: fp.close() except: pass try: os.unlink(tempname) except: pass raise e os.unlink(tempname) if status and (status[0] or status[1]): tempname2 = self.__set_status(status, tempname2, prefix) if self.names_and_times != None: self.names_and_times.append([tempname2, now]) def __set_status(self, status, tempname, prefix): suffix = ':2,' if status[0]: suffix += 'S' if status[1]: suffix += 'F' tempname2 = os.path.join('cur', prefix + suffix) os.rename(self.pathname(tempname), self.pathname(tempname2)) return tempname2 def get_status(self, index, header = None): self.scan() filename = self.filename_at_index(index) info = filename.split(':')[-1] return ('S' in info, 'F' in info) def parse_imap(full_desc): """ Parses an imap URI as passed on the command line, returning an array: [proto, host, port, username, password, folder] where proto is "imap" or "imaps". Prompts for password if not given in URI. """ proto, desc = full_desc.split('://', 1) if proto == 'imap': port = 143 else: port = 993 desc = desc.split('/', 1) if len(desc) > 1 and desc[1]: folder = desc[1] else: folder = None user = None passwd = None desc = desc[0].split('@') if len(desc) > 1: user = desc[0].split(':') if len(user) > 1: passwd = user[1] user = user[0] desc = desc[1] else: desc = desc[0] host = desc.split(':') if len(host) > 1: port = host[1] host = host[0] if not passwd: if user: prompt = _("Enter password for '%s': ") % full_desc else: user = getpass.getuser() prompt = _("Enter password for user '%s' on '%s': ") \ % (user, full_desc) passwd = getpass.getpass(prompt) if not user: user = getpass.getuser() if not folder: folder = 'INBOX' return [proto, host, port, user, passwd, folder] class ImapConnection: def __init__(self, desc): """ desc is a tuple: [proto, host, port, username, password]. """ self.desc = desc if desc[0] == 'imap': i = imaplib.IMAP4(desc[1], desc[2]) else: i = imaplib.IMAP4_SSL(desc[1], desc[2]) i.login(desc[3], desc[4]) self.folder = None self.imap = i self.clients = 1 def select_folder(self, folder): "Returns False if folder already selected." if self.folder != folder: self.close_folder() self.imap.select(folder) self.folder = folder return True else: return False def close_folder(self): if self.folder: self.imap.close() self.folder = None def disconnect(self, folder = None): self.clients -= 1 if self.clients: if folder == self.folder: self.close_folder else: self.close_folder() self.imap.logout() class ImapFolder(MailFolder): def __init__(self, uri, cnxn = None): """ uri is description as passed on command line. cnxn is an existing connection that may be reused if everything except the folder is identical. """ MailFolder.__init__(self, uri) desc = parse_imap(uri) if cnxn and desc[:5] == cnxn.desc: self.cnxn = cnxn cnxn.clients += 1 else: self.cnxn = ImapConnection(desc[:5]) self.folder = desc[5] self.deleted = False self.messagenums = None self.scanned = False def scan(self, force = False): if self.messagenums != None and not force: return if not self.select_folder() and not force: return if not options.quiet and not self.scanned: print _("Scanning '%s'") % self.desc typ, data = self.cnxn.imap.search(None, 'ALL') self.messagenums = data[0].split() if not options.quiet and not self.scanned: print _("%d messages") % len(self.messagenums) self.scanned = True def select_folder(self): return self.cnxn.select_folder(self.folder) def get_num_messages(self): self.scan() return len(self.messagenums) def mark_message_unread(self, index): self.cnxn.imap.store(self.messagenums[index], '-FLAGS', r'\Seen') def __preserve_unread_status(self, index, old_status): if not old_status: self.mark_message_unread(index) # NOTE: if remove get_status() call from following 3 methods, replace # with scan() def get_named_header_at_index(self, index, header): self.select_folder() old_seen = self.get_status(index)[0] typ, data = self.cnxn.imap.fetch(self.messagenums[index], '(BODY.PEEK[HEADER.FIELDS (%s)])' % header.upper()) self.__preserve_unread_status(index, old_seen) return data[0][1].split(':')[1:] def get_entire_header_at_index(self, index): self.select_folder() old_seen = self.get_status(index)[0] typ, data = self.cnxn.imap.fetch(self.messagenums[index], '(RFC822.HEADER)') self.__preserve_unread_status(index, old_seen) return email.message_from_string(data[0][1]) def get_message_at_index(self, index): self.select_folder() old_seen = self.get_status(index)[0] typ, data = self.cnxn.imap.fetch(self.messagenums[index], '(RFC822)') self.__preserve_unread_status(index, old_seen) return email.message_from_string(data[0][1].replace("\r","")) def delete_message(self, index): self.select_folder() self.scan() self.cnxn.imap.store(self.messagenums[index], '+FLAGS', r'\Deleted') self.deleted = True def append_message(self, message, flags = None): self.cnxn.close_folder() f = None if flags and (flags[0] or flags[1]): if flags[0]: f = r'\Seen' if flags[1]: if f: f += ' ' f += r'\Flagged' f = '(' + f + ')' self.cnxn.imap.append(self.folder, f, get_message_timestamp(message), str(message)) def close(self): self.cnxn.disconnect(self.folder) def get_status(self, index, header = None): self.scan() flags = self.cnxn.imap.fetch(self.messagenums[index], '(FLAGS)')[1][0] return (r'\Seen' in flags, r'\Flagged' in flags) existing_imap = None def folder_factory(desc): global existing_imap if desc.startswith('imap://') or desc.startswith('imaps://'): i = ImapFolder(desc, existing_imap) if not existing_imap: existing_imap = i return i elif desc[-1] == '/': return MailDirFolder(desc) else: return MailBoxFolder(desc) def check_args(): parse_args() if not options.input: print >> sys.stderr, _("No input folder specified.") sys.exit(2) if not options.expire and not options.copy: print >> sys.stderr, _("Must give --expire or --copy option or both.") sys.exit(2) if options.copy and not options.output: print >> sys.stderr, _("No output folder specified for --copy.") sys.exit(2) def run(): check_args() input = folder_factory(options.input) if options.output: output = folder_factory(options.output) else: output = None if options.expire: expire_limit = time.time() - options.age * 24 * 3600 #print "expire_limit = %x (%s)" \ # % (expire_limit, email.Utils.formatdate(expire_limit)) total = input.get_num_messages() if not total: print "No messages in", options.input return header = input.get_entire_header_at_index(0) header_n = 0 deleted = 0 stamp = get_message_timestamp(header) if not stamp: print >>sys.stderr, _("Was relying on that timestamp to determine " \ "how far back to read Message-IDs, giving up") return if output: output.scan_messageids(stamp, options.agediff) dots = init_dots(_("Processing '%s'") % input.desc) for n in range(total): body = None if options.copy: if header_n != n: header = input.get_entire_header_at_index(n) header_n = n if output.check_unique(header): body = input.get_message_at_index(n) status = input.get_status(n) output.append_if_unique(body, status) if options.expire: if header_n != n: header = input.get_entire_header_at_index(n) header_n = n stamp = get_message_timestamp(header) if not stamp: continue expire = True status = input.get_status(n, header) if options.unread or options.flagged: if options.unread and not status[0]: expire = False if options.flagged and status[1]: expire = False #print "stamp = %x (%s)" % (stamp, email.Utils.formatdate(stamp)) if expire and stamp < expire_limit: if not options.copy and output: if output.check_unique(header): if not body: body = input.get_message_at_index(n) output.append_if_unique(body, status) input.delete_message(n) deleted += 1 dot(dots, n, total) end_dots() if output and not options.quiet: print _("%d+%d messages were added to output, " \ "%d+%d rejected as duplicates.") \ % (output.mid_accepted, output.anon_accepted, output.mid_dups, output.anon_dups) print _("%d message(s) were deleted from input, " "%d preserved.") % (deleted, total - deleted) input.close() if output: output.close() if __name__ == '__main__': run()