diff options
author | Dan McGee <dan@archlinux.org> | 2011-11-17 12:34:12 -0600 |
---|---|---|
committer | Dan McGee <dan@archlinux.org> | 2011-11-17 12:49:10 -0600 |
commit | a9819e3d715ce3e5c20c9665db9a6100f06ab562 (patch) | |
tree | 0d5b457e7de06401095a17993735b1530f20f454 /devel | |
parent | 9d2fdbe5bc6a0d9ab2907b377056851fc5eb56c3 (diff) |
Ensure reporead is protected against simultaneous runs
This adds a bunch of transaction magic and SELECT FOR UPDATE stuff to
reporead to cope with the now-concurrent runs of reporead we get when
invoked from our inotify-based updater. The collision occurs with 'any'
architecture packages as both repo databases contain the new version,
and the updates occur at exactly the same time.
Signed-off-by: Dan McGee <dan@archlinux.org>
Diffstat (limited to 'devel')
-rw-r--r-- | devel/management/commands/reporead.py | 206 |
1 files changed, 106 insertions, 100 deletions
diff --git a/devel/management/commands/reporead.py b/devel/management/commands/reporead.py index ad76db4d..b6bd8457 100644 --- a/devel/management/commands/reporead.py +++ b/devel/management/commands/reporead.py @@ -13,10 +13,6 @@ Example: ./manage.py reporead i686 /tmp/core.db.tar.gz """ -from django.core.management.base import BaseCommand, CommandError -from django.contrib.auth.models import User -from django.db import transaction - from collections import defaultdict import io import os @@ -27,6 +23,11 @@ import logging from datetime import datetime from optparse import make_option +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth.models import User +from django.db import connections, router, transaction +from django.db.utils import IntegrityError + from devel.utils import UserFinder from main.models import Arch, Package, PackageDepend, PackageFile, Repo from packages.models import Conflict, Provision, Replacement @@ -189,8 +190,6 @@ def create_multivalued(dbpkg, repopkg, db_attr, repo_attr): finder = UserFinder() def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): - db_score = 1 - if repopkg.base: dbpkg.pkgbase = repopkg.base else: @@ -214,7 +213,7 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): dbpkg.last_update = timestamp dbpkg.save() - db_score += populate_files(dbpkg, repopkg, force=force) + populate_files(dbpkg, repopkg, force=force) dbpkg.packagedepend_set.all().delete() for y in repopkg.depends: @@ -235,28 +234,23 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): create_multivalued(dbpkg, repopkg, 'groups', 'groups') create_multivalued(dbpkg, repopkg, 'licenses', 'license') - related_score = (len(repopkg.depends) + len(repopkg.optdepends) - + len(repopkg.conflicts) + len(repopkg.provides) - + len(repopkg.replaces) + len(repopkg.groups) - + len(repopkg.license)) - if related_score: - db_score += (related_score / 20) + 1 - return db_score +pkg_same_version = lambda pkg, dbpkg: pkg.ver == dbpkg.pkgver \ + and pkg.rel == dbpkg.pkgrel and pkg.epoch == dbpkg.epoch def populate_files(dbpkg, repopkg, force=False): if not force: - if dbpkg.pkgver != repopkg.ver or dbpkg.pkgrel != repopkg.rel \ - or dbpkg.epoch != repopkg.epoch: + if not pkg_same_version(repopkg, dbpkg): logger.info("DB version (%s) didn't match repo version " "(%s) for package %s, skipping file list addition", dbpkg.full_version, repopkg.full_version, dbpkg.pkgname) - return 0 + return if not dbpkg.files_last_update or not dbpkg.last_update: pass elif dbpkg.files_last_update > dbpkg.last_update: - return 0 + return + # only delete files if we are reading a DB that contains them if repopkg.has_files: dbpkg.packagefile_set.all().delete() @@ -275,30 +269,19 @@ def populate_files(dbpkg, repopkg, force=False): pkgfile.save(force_insert=True) dbpkg.files_last_update = datetime.utcnow() dbpkg.save() - return (len(repopkg.files) / 50) + 1 - return 0 - -class Batcher(object): - def __init__(self, threshold, start=0): - self.threshold = threshold - self.meter = start - def batch_commit(self, score): - """ - Track updates to the database and perform a commit if the batch - becomes sufficiently large. "Large" is defined by waiting for the - sum of scores to exceed the arbitrary threshold value; once it is - hit a commit is issued. - """ - self.meter += score - if self.meter > self.threshold: - logger.debug("Committing transaction, batch threshold hit") - transaction.commit() - self.meter = 0 +def select_pkg_for_update(dbpkg): + database = router.db_for_write(Package, instance=dbpkg) + connection = connections[database] + if 'sqlite' in connection.settings_dict['ENGINE'].lower(): + return dbpkg + new_pkg = Package.objects.raw( + 'SELECT * FROM packages WHERE id = %s FOR UPDATE', + [dbpkg.id]) + return list(new_pkg)[0] -@transaction.commit_on_success def db_update(archname, reponame, pkgs, options): """ Parses a list and updates the Arch dev database accordingly. @@ -310,88 +293,111 @@ def db_update(archname, reponame, pkgs, options): logger.info('Updating Arch: %s', archname) force = options.get('force', False) filesonly = options.get('filesonly', False) - repository = Repo.objects.get(name__iexact=reponame) - architecture = Arch.objects.get(name__iexact=archname) - # no-arg order_by() removes even the default ordering; we don't need it - dbpkgs = Package.objects.filter( - arch=architecture, repo=repository).order_by() - # This makes our inner loop where we find packages by name *way* more - # efficient by not having to go to the database for each package to - # SELECT them by name. - dbdict = dict([(pkg.pkgname, pkg) for pkg in dbpkgs]) - - logger.debug("Creating sets") - dbset = set(dbdict.keys()) - syncset = set([pkg.name for pkg in pkgs]) - logger.info("%d packages in current web DB", len(dbset)) - logger.info("%d packages in new updating db", len(syncset)) - in_sync_not_db = syncset - dbset - logger.info("%d packages in sync not db", len(in_sync_not_db)) - - # Try to catch those random package deletions that make Eric so unhappy. - if len(dbset): - dbpercent = 100.0 * len(syncset) / len(dbset) - else: - dbpercent = 0.0 - logger.info("DB package ratio: %.1f%%", dbpercent) - - # Fewer than 20 packages makes the percentage check unreliable, but it also - # means we expect the repo to fluctuate a lot. - msg = "Package database has %.1f%% the number of packages in the " \ - "web database" % dbpercent - if len(dbset) == 0 and len(syncset) == 0: - pass - elif not filesonly and \ - len(dbset) > 20 and dbpercent < 50.0 and \ - not repository.testing and not repository.staging: - logger.error(msg) - raise Exception(msg) - elif dbpercent < 75.0: - logger.warning(msg) - - batcher = Batcher(100) + + with transaction.commit_manually(): + repository = Repo.objects.get(name__iexact=reponame) + architecture = Arch.objects.get(name__iexact=archname) + # no-arg order_by() removes even the default ordering; we don't need it + dbpkgs = Package.objects.filter( + arch=architecture, repo=repository).order_by() + # This makes our inner loop where we find packages by name *way* more + # efficient by not having to go to the database for each package to + # SELECT them by name. + dbdict = dict((dbpkg.pkgname, dbpkg) for dbpkg in dbpkgs) + + logger.debug("Creating sets") + dbset = set(dbdict.keys()) + syncset = set([pkg.name for pkg in pkgs]) + logger.info("%d packages in current web DB", len(dbset)) + logger.info("%d packages in new updating db", len(syncset)) + in_sync_not_db = syncset - dbset + logger.info("%d packages in sync not db", len(in_sync_not_db)) + + # Try to catch those random package deletions that make Eric so unhappy. + if len(dbset): + dbpercent = 100.0 * len(syncset) / len(dbset) + else: + dbpercent = 0.0 + logger.info("DB package ratio: %.1f%%", dbpercent) + + # Fewer than 20 packages makes the percentage check unreliable, but it also + # means we expect the repo to fluctuate a lot. + msg = "Package database has %.1f%% the number of packages in the " \ + "web database" % dbpercent + if len(dbset) == 0 and len(syncset) == 0: + pass + elif not filesonly and \ + len(dbset) > 20 and dbpercent < 50.0 and \ + not repository.testing and not repository.staging: + logger.error(msg) + raise Exception(msg) + elif dbpercent < 75.0: + logger.warning(msg) + + # If isolation level is repeatable-read, we need to ensure each package + # update starts a new transaction and re-queries the database as necessary + # to guard against simultaneous updates + transaction.commit() if not filesonly: # packages in syncdb and not in database (add to database) - for p in [x for x in pkgs if x.name in in_sync_not_db]: - logger.info("Adding package %s", p.name) - pkg = Package(pkgname=p.name, arch=architecture, repo=repository) - score = populate_pkg(pkg, p, timestamp=datetime.utcnow()) - batcher.batch_commit(score) + for pkg in (pkg for pkg in pkgs if pkg.name in in_sync_not_db): + logger.info("Adding package %s", pkg.name) + dbpkg = Package(pkgname=pkg.name, arch=architecture, repo=repository) + try: + with transaction.commit_on_success(): + populate_pkg(dbpkg, pkg, timestamp=datetime.utcnow()) + except IntegrityError: + logger.warning("Could not add package %s; " + "not fatal if another thread beat us to it.", + pkg.name, exc_info=True) # packages in database and not in syncdb (remove from database) - in_db_not_sync = dbset - syncset - for p in in_db_not_sync: - logger.info("Removing package %s", p) - dbp = dbdict[p] - dbp.delete() - batcher.batch_commit(1) + for pkgname in (dbset - syncset): + logger.info("Removing package %s", pkgname) + dbpkg = dbdict[pkgname] + with transaction.commit_on_success(): + # no race condition here as long as simultaneous threads both + # issue deletes; second delete will be a no-op + dbpkg.delete() # packages in both database and in syncdb (update in database) pkg_in_both = syncset & dbset - for p in [x for x in pkgs if x.name in pkg_in_both]: - logger.debug("Checking package %s", p.name) - dbp = dbdict[p.name] + for pkg in (x for x in pkgs if x.name in pkg_in_both): + logger.debug("Checking package %s", pkg.name) + dbpkg = dbdict[pkg.name] timestamp = None # for a force, we don't want to update the timestamp. # for a non-force, we don't want to do anything at all. if filesonly: pass - elif p.ver == dbp.pkgver and p.rel == dbp.pkgrel \ - and p.epoch == dbp.epoch: + elif pkg_same_version(pkg, dbpkg): if not force: continue else: timestamp = datetime.utcnow() + # The odd select_for_update song and dance here are to ensure + # simultaneous updates don't happen on a package, causing + # files/depends/all related items to be double-imported. if filesonly: - logger.debug("Checking files for package %s", p.name) - score = populate_files(dbp, p, force=force) + with transaction.commit_on_success(): + # TODO Django 1.4 select_for_update() will work once released + dbpkg = select_pkg_for_update(dbpkg) + if pkg_same_version(pkg, dbpkg): + logger.debug("Package %s was already updated", pkg.name) + continue + logger.debug("Checking files for package %s", pkg.name) + populate_files(dbpkg, pkg, force=force) else: - logger.info("Updating package %s", p.name) - score = populate_pkg(dbp, p, force=force, timestamp=timestamp) - - batcher.batch_commit(score) + with transaction.commit_on_success(): + # TODO Django 1.4 select_for_update() will work once released + dbpkg = select_pkg_for_update(dbpkg) + if pkg_same_version(pkg, dbpkg): + logger.debug("Package %s was already updated", pkg.name) + continue + logger.info("Updating package %s", pkg.name) + populate_pkg(dbpkg, pkg, force=force, timestamp=timestamp) logger.info('Finished updating Arch: %s', archname) |