diff options
-rwxr-xr-x | devel/management/commands/reporead_inotify.py | 188 |
1 files changed, 188 insertions, 0 deletions
diff --git a/devel/management/commands/reporead_inotify.py b/devel/management/commands/reporead_inotify.py new file mode 100755 index 00000000..135c0367 --- /dev/null +++ b/devel/management/commands/reporead_inotify.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +""" +reporead_inotify command + +Watches repo.files.tar.gz files for updates and parses them after a short delay +in order to catch all updates in a single bulk update. + +Usage: ./manage.py reporead_inotify [path_template] + +Where 'path_template' is an optional path_template for finding the +repo.files.tar.gz files. The form is '/srv/ftp/%(repo)s/os/%(arch)s/', which is +also the default template if none is specified. While 'repo' is not required to +be present in the path_template, note that 'arch' is so reporead can function +correctly. +""" + +import logging +import os.path +import pyinotify +import sys +import threading +import time + +from django.core.management.base import BaseCommand, CommandError + +from main.models import Arch, Repo +from .reporead import read_repo + +logging.basicConfig( + level=logging.WARNING, + format='%(asctime)s -> %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stderr) +logger = logging.getLogger() + +class Command(BaseCommand): + help = "Watch database files and run an update when necessary." + args = "[path_template]" + + def handle(self, path_template=None, **options): + v = int(options.get('verbosity', 0)) + if v == 0: + logger.level = logging.ERROR + elif v == 1: + logger.level = logging.INFO + elif v == 2: + logger.level = logging.DEBUG + + if not path_template: + path_template = '/srv/ftp/%(repo)s/os/%(arch)s/' + self.path_template = path_template + + notifier = self.setup_notifier() + logger.info('Entering notifier loop') + notifier.loop() + + def setup_notifier(self): + '''Set up and configure the inotify machinery and logic. + This takes the provided or default path_template and builds a list of + directories we need to watch for database updates. It then validates + and passes these on to the various pyinotify pieces as necessary and + finally builds and returns a notifier object.''' + arches = Arch.objects.filter(agnostic=False) + repos = Repo.objects.all() + arch_path_map = dict((arch, None) for arch in arches) + all_paths = set() + total_paths = 0 + for arch in arches: + combos = ({ 'repo': repo.name.lower(), 'arch': arch.name } + for repo in repos) + # take a python format string and generate all unique combinations + # of directories from it; using set() ensures we filter it down + paths = set(self.path_template % values for values in combos) + total_paths += len(paths) + all_paths |= paths + arch_path_map[arch] = paths + + logger.info('Watching %d total paths', total_paths) + logger.debug(all_paths) + + # sanity check- basically ensure every path we created from the + # template mapped to only one architecture + if total_paths != len(all_paths): + raise CommandError('path template did not uniquely ' + 'determine architecture for each file') + + # A proper atomic replacement of the database as done by rsync is type + # IN_MOVED_TO. repo-add/remove will finish with a IN_CLOSE_WRITE. + mask = pyinotify.IN_CLOSE_WRITE | pyinotify.IN_MOVED_TO + + manager = pyinotify.WatchManager() + for name in all_paths: + manager.add_watch(name, mask) + + handler = EventHandler(arch_paths=arch_path_map) + return pyinotify.Notifier(manager, handler) + + +class Database(object): + '''A object representing a pacman database on the filesystem. It stores + various bits of metadata and state representing the file path, when we last + updated, how long our delay is before performing the update, whether we are + updating now, etc.''' + def __init__(self, arch, path, delay=60.0): + self.arch = arch + self.path = path + self.delay = delay + self.mtime = None + self.last_import = None + self.update_thread = None + self.updating = False + self.run_again = False + self.lock = threading.Lock() + + def _start_update_countdown(self): + self.update_thread = threading.Timer(self.delay, self.update) + logger.info('Starting %.1f second countdown to update %s', + self.delay, self.path) + self.update_thread.start() + + def queue_for_update(self, mtime): + logger.debug('Queueing database %s...', self.path) + with self.lock: + self.mtime = mtime + if self.updating: + # store the fact that we will need to run it again + self.run_again = True + return + if self.update_thread: + self.update_thread.cancel() + self._start_update_countdown() + + def update(self): + logger.debug('Updating database %s...', self.path) + with self.lock: + self.last_import = time.time() + self.updating = True + + try: + # invoke reporead's primary method + read_repo(self.arch, self.path, {}) + finally: + logger.debug('Done updating database %s.', self.path) + with self.lock: + self.update_thread = None + self.updating = False + if self.run_again: + self.run_again = False + self._start_update_countdown() + + +class EventHandler(pyinotify.ProcessEvent): + '''Our main event handler which listens for database change events. Because + we are watching the whole directory, we filter down and only look at those + events dealing with files databases.''' + + def my_init(self, **kwargs): + self.databases = {} + self.arch_lookup = {} + + # we really want a single path to arch mapping, so massage the data + arch_paths = kwargs['arch_paths'] + for arch, paths in arch_paths.items(): + self.arch_lookup.update((path.rstrip('/'), arch) for path in paths) + + def process_default(self, event): + '''Primary event processing function which kicks off reporead timer + threads if a files database was updated.''' + if not event.name: + return + # screen to only the files we care about + if event.name.endswith('.files.tar.gz'): + path = event.pathname + stat = os.stat(path) + database = self.databases.get(path, None) + if database is None: + arch = self.arch_lookup.get(event.path, None) + if arch is None: + logger.warning( + 'Could not determine arch for %s, skipping update', + path) + return + database = Database(arch, path) + self.databases[path] = database + database.queue_for_update(stat.st_mtime) + + +# vim: set ts=4 sw=4 et: |