From 895f8a20d35a18f3a0cc6e1530eb40292270fc7c Mon Sep 17 00:00:00 2001 From: Dan McGee Date: Thu, 9 Jun 2011 17:25:32 -0500 Subject: reporead: allow batching of package updates The real reason I originally added transactions to this code was to prevent half-updates; e.g. a package gets in without the matching depends values. We can safely commit between packages and resume processing the database at a later time. Take advantage of this fact and commit every so often in batch fashion if we have a lot of updates piling up. In the case of updating the files DB, this can really cut down on the need to hold open a long-running, statement heavy transaction and get the information public faster. Signed-off-by: Dan McGee --- devel/management/commands/reporead.py | 58 +++++++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 9 deletions(-) (limited to 'devel/management') diff --git a/devel/management/commands/reporead.py b/devel/management/commands/reporead.py index 1adc359e..e9878c93 100644 --- a/devel/management/commands/reporead.py +++ b/devel/management/commands/reporead.py @@ -221,6 +221,8 @@ def create_multivalued(dbpkg, repopkg, db_attr, repo_attr): collection.create(name=name) def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): + db_score = 1 + if repopkg.base: dbpkg.pkgbase = repopkg.base else: @@ -251,7 +253,7 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): dbpkg.last_update = timestamp dbpkg.save() - populate_files(dbpkg, repopkg, force=force) + db_score += populate_files(dbpkg, repopkg, force=force) dbpkg.packagedepend_set.all().delete() for y in repopkg.depends: @@ -272,6 +274,15 @@ def populate_pkg(dbpkg, repopkg, force=False, timestamp=None): create_multivalued(dbpkg, repopkg, 'groups', 'groups') create_multivalued(dbpkg, repopkg, 'licenses', 'license') + related_score = (len(repopkg.depends) + len(repopkg.optdepends) + + len(repopkg.conflicts) + len(repopkg.provides) + + len(repopkg.replaces) + len(repopkg.groups) + + len(repopkg.license)) + if related_score: + db_score += (related_score / 20) + 1 + + return db_score + def populate_files(dbpkg, repopkg, force=False): if not force: @@ -280,11 +291,11 @@ def populate_files(dbpkg, repopkg, force=False): logger.info("DB version (%s) didn't match repo version " "(%s) for package %s, skipping file list addition", dbpkg.full_version, repopkg.full_version, dbpkg.pkgname) - return + return 0 if not dbpkg.files_last_update or not dbpkg.last_update: pass elif dbpkg.files_last_update > dbpkg.last_update: - return + return 0 # only delete files if we are reading a DB that contains them if repopkg.has_files: dbpkg.packagefile_set.all().delete() @@ -303,6 +314,28 @@ def populate_files(dbpkg, repopkg, force=False): pkgfile.save(force_insert=True) dbpkg.files_last_update = datetime.utcnow() dbpkg.save() + return (len(repopkg.files) / 50) + 1 + return 0 + + +class Batcher(object): + def __init__(self, threshold, start=0): + self.threshold = threshold + self.meter = start + + def batch_commit(self, score): + """ + Track updates to the database and perform a commit if the batch + becomes sufficiently large. "Large" is defined by waiting for the + sum of scores to exceed the arbitrary threshold value; once it is + hit a commit is issued. + """ + self.meter += score + if self.meter > self.threshold: + logger.debug("Committing transaction, batch threshold hit") + transaction.commit() + self.meter = 0 + @transaction.commit_on_success def db_update(archname, reponame, pkgs, options): @@ -355,19 +388,23 @@ def db_update(archname, reponame, pkgs, options): elif dbpercent < 75.0: logger.warning(msg) + batcher = Batcher(100) + if not filesonly: # packages in syncdb and not in database (add to database) for p in [x for x in pkgs if x.name in in_sync_not_db]: logger.info("Adding package %s", p.name) pkg = Package(pkgname = p.name, arch = architecture, repo = repository) - populate_pkg(pkg, p, timestamp=datetime.utcnow()) + score = populate_pkg(pkg, p, timestamp=datetime.utcnow()) + batcher.batch_commit(score) # packages in database and not in syncdb (remove from database) in_db_not_sync = dbset - syncset for p in in_db_not_sync: - logger.info("Removing package %s from database", p) + logger.info("Removing package %s", p) dbp = dbdict[p] dbp.delete() + batcher.batch_commit(score) # packages in both database and in syncdb (update in database) pkg_in_both = syncset & dbset @@ -385,12 +422,15 @@ def db_update(archname, reponame, pkgs, options): continue else: timestamp = datetime.utcnow() + if filesonly: - logger.debug("Checking files for package %s in database", p.name) - populate_files(dbp, p, force=force) + logger.debug("Checking files for package %s", p.name) + score = populate_files(dbp, p, force=force) else: - logger.info("Updating package %s in database", p.name) - populate_pkg(dbp, p, force=force, timestamp=timestamp) + logger.info("Updating package %s", p.name) + score = populate_pkg(dbp, p, force=force, timestamp=timestamp) + + batcher.batch_commit(score) logger.info('Finished updating Arch: %s', archname) -- cgit v1.2.3-54-g00ecf