diff options
Diffstat (limited to 'mirrors/management/commands/mirrorcheck.py')
-rw-r--r-- | mirrors/management/commands/mirrorcheck.py | 204 |
1 files changed, 154 insertions, 50 deletions
diff --git a/mirrors/management/commands/mirrorcheck.py b/mirrors/management/commands/mirrorcheck.py index 7ffb7773..d6de8f22 100644 --- a/mirrors/management/commands/mirrorcheck.py +++ b/mirrors/management/commands/mirrorcheck.py @@ -9,24 +9,30 @@ we encounter errors, record those as well. Usage: ./manage.py mirrorcheck """ -from django.core.management.base import NoArgsCommand -from django.db import transaction - from collections import deque from datetime import datetime +from httplib import HTTPException import logging +import os +from optparse import make_option +from pytz import utc import re import socket +import subprocess import sys import time +import tempfile from threading import Thread import types -from pytz import utc from Queue import Queue, Empty import urllib2 -from main.utils import utc_now -from mirrors.models import MirrorUrl, MirrorLog +from django.core.management.base import NoArgsCommand +from django.db import transaction +from django.utils.timezone import now + +from mirrors.models import MirrorUrl, MirrorLog, CheckLocation + logging.basicConfig( level=logging.WARNING, @@ -35,7 +41,14 @@ logging.basicConfig( stream=sys.stderr) logger = logging.getLogger() + class Command(NoArgsCommand): + option_list = NoArgsCommand.option_list + ( + make_option('-t', '--timeout', dest='timeout', type='float', default=10.0, + help='Timeout value for connecting to URL'), + make_option('-l', '--location', dest='location', type='int', + help='ID of CheckLocation object to use for this run'), + ) help = "Runs a check on all known mirror URLs to determine their up-to-date status." def handle_noargs(self, **options): @@ -47,36 +60,73 @@ class Command(NoArgsCommand): elif v == 2: logger.level = logging.DEBUG - return check_current_mirrors() + timeout = options.get('timeout') + + urls = MirrorUrl.objects.select_related('protocol').filter( + mirror__active=True, mirror__public=True) + + location = options.get('location', None) + if location: + location = CheckLocation.objects.get(id=location) + family = location.family + monkeypatch_getaddrinfo(family) + if family == socket.AF_INET6: + urls = urls.filter(has_ipv6=True) + elif family == socket.AF_INET: + urls = urls.filter(has_ipv4=True) + + pool = MirrorCheckPool(urls, location, timeout) + pool.run() + return 0 + + +def monkeypatch_getaddrinfo(force_family=socket.AF_INET): + '''Force the Python socket module to connect over the designated family; + e.g. socket.AF_INET or socket.AF_INET6.''' + orig = socket.getaddrinfo + + def wrapper(host, port, family=0, socktype=0, proto=0, flags=0): + return orig(host, port, force_family, socktype, proto, flags) + + socket.getaddrinfo = wrapper + + +def parse_lastsync(log, data): + '''lastsync file should be an epoch value created by us.''' + try: + parsed_time = datetime.utcfromtimestamp(int(data)) + log.last_sync = parsed_time.replace(tzinfo=utc) + except ValueError: + # it is bad news to try logging the lastsync value; + # sometimes we get a crazy-encoded web page. + # if we couldn't parse a time, this is a failure. + log.last_sync = None + log.error = "Could not parse time from lastsync" + log.is_success = False + + +def check_mirror_url(mirror_url, location, timeout): + if location: + if location.family == socket.AF_INET6: + ipopt = '--ipv6' + elif location.family == socket.AF_INET: + ipopt = '--ipv4' -def check_mirror_url(mirror_url): url = mirror_url.url + 'lastsync' logger.info("checking URL %s", url) - log = MirrorLog(url=mirror_url, check_time=utc_now()) + log = MirrorLog(url=mirror_url, check_time=now(), location=location) + headers = {'User-Agent': 'archweb/1.0'} + req = urllib2.Request(url, None, headers) try: start = time.time() - result = urllib2.urlopen(url, timeout=10) + result = urllib2.urlopen(req, timeout=timeout) data = result.read() result.close() end = time.time() - # lastsync should be an epoch value created by us - parsed_time = None - try: - parsed_time = datetime.utcfromtimestamp(int(data)) - parsed_time = parsed_time.replace(tzinfo=utc) - except ValueError: - # it is bad news to try logging the lastsync value; - # sometimes we get a crazy-encoded web page. - pass - - log.last_sync = parsed_time - # if we couldn't parse a time, this is a failure - if parsed_time is None: - log.error = "Could not parse time from lastsync" - log.is_success = False + parse_lastsync(log, data) log.duration = end - start logger.debug("success: %s, %.2f", url, log.duration) - except urllib2.HTTPError, e: + except urllib2.HTTPError as e: if e.code == 404: # we have a duration, just not a success end = time.time() @@ -84,7 +134,7 @@ def check_mirror_url(mirror_url): log.is_success = False log.error = str(e) logger.debug("failed: %s, %s", url, log.error) - except urllib2.URLError, e: + except urllib2.URLError as e: log.is_success = False log.error = e.reason if isinstance(e.reason, types.StringTypes) and \ @@ -97,35 +147,102 @@ def check_mirror_url(mirror_url): elif isinstance(e.reason, socket.error): log.error = e.reason.args[1] logger.debug("failed: %s, %s", url, log.error) - except socket.timeout, e: + except HTTPException as e: + # e.g., BadStatusLine + log.is_success = False + log.error = "Exception in processing HTTP request." + logger.debug("failed: %s, %s", url, log.error) + except socket.timeout as e: log.is_success = False log.error = "Connection timed out." logger.debug("failed: %s, %s", url, log.error) + except socket.error as e: + log.is_success = False + log.error = str(e) + logger.debug("failed: %s, %s", url, log.error) return log -def mirror_url_worker(work, output): + +def check_rsync_url(mirror_url, location, timeout): + url = mirror_url.url + 'lastsync' + logger.info("checking URL %s", url) + log = MirrorLog(url=mirror_url, check_time=now(), location=location) + + tempdir = tempfile.mkdtemp() + ipopt = '' + if location: + if location.family == socket.AF_INET6: + ipopt = '--ipv6' + elif location.family == socket.AF_INET: + ipopt = '--ipv4' + lastsync_path = os.path.join(tempdir, 'lastsync') + rsync_cmd = ["rsync", "--quiet", "--contimeout=%d" % timeout, + "--timeout=%d" % timeout] + if ipopt: + rsync_cmd.append(ipopt) + rsync_cmd.append(url) + rsync_cmd.append(lastsync_path) + try: + with open(os.devnull, 'w') as devnull: + logger.debug("rsync cmd: %s", ' '.join(rsync_cmd)) + proc = subprocess.Popen(rsync_cmd, stdout=devnull, + stderr=subprocess.PIPE) + start = time.time() + _, errdata = proc.communicate() + end = time.time() + log.duration = end - start + if proc.returncode != 0: + logger.debug("error: %s, %s", url, errdata) + log.is_success = False + log.error = errdata.strip() + # look at rsync error code- if we had a command error or timed out, + # don't record a duration as it is misleading + if proc.returncode in (1, 30, 35): + log.duration = None + else: + logger.debug("success: %s, %.2f", url, log.duration) + with open(lastsync_path, 'r') as lastsync: + parse_lastsync(log, lastsync.read()) + finally: + if os.path.exists(lastsync_path): + os.unlink(lastsync_path) + os.rmdir(tempdir) + + return log + + +def mirror_url_worker(work, output, location, timeout): while True: try: - item = work.get(block=False) + url = work.get(block=False) try: - log = check_mirror_url(item) - output.append(log) + if url.protocol.protocol == 'rsync': + log = check_rsync_url(url, location, timeout) + elif (url.protocol.protocol == 'ftp' and location and + location.family == socket.AF_INET6): + # IPv6 + FTP don't work; skip checking completely + log = None + else: + log = check_mirror_url(url, location, timeout) + if log: + output.append(log) finally: work.task_done() except Empty: return 0 + class MirrorCheckPool(object): - def __init__(self, work, num_threads=10): + def __init__(self, urls, location, timeout=10, num_threads=10): self.tasks = Queue() self.logs = deque() - for i in list(work): - self.tasks.put(i) + for url in list(urls): + self.tasks.put(url) self.threads = [] for i in range(num_threads): thread = Thread(target=mirror_url_worker, - args=(self.tasks, self.logs)) + args=(self.tasks, self.logs, location, timeout)) thread.daemon = True self.threads.append(thread) @@ -136,21 +253,8 @@ class MirrorCheckPool(object): thread.start() logger.debug("joining on all threads") self.tasks.join() - logger.debug("processing log entries") + logger.debug("processing %d log entries", len(self.logs)) MirrorLog.objects.bulk_create(self.logs) logger.debug("log entries saved") -def check_current_mirrors(): - urls = MirrorUrl.objects.filter( - protocol__is_download=True, - mirror__active=True, mirror__public=True) - - pool = MirrorCheckPool(urls) - pool.run() - return 0 - -# For lack of a better place to put it, here is a query to get latest check -# result joined with mirror details: -# SELECT mu.*, m.*, ml.* FROM mirrors_mirrorurl mu JOIN mirrors_mirror m ON mu.mirror_id = m.id JOIN mirrors_mirrorlog ml ON mu.id = ml.url_id LEFT JOIN mirrors_mirrorlog ml2 ON ml.url_id = ml2.url_id AND ml.id < ml2.id WHERE ml2.id IS NULL AND m.active = 1 AND m.public = 1; - # vim: set ts=4 sw=4 et: |