diff options
Diffstat (limited to 'mirrors/management')
-rw-r--r-- | mirrors/management/commands/mirrorcheck.py | 286 | ||||
-rw-r--r-- | mirrors/management/commands/mirrorresolv.py | 65 |
2 files changed, 263 insertions, 88 deletions
diff --git a/mirrors/management/commands/mirrorcheck.py b/mirrors/management/commands/mirrorcheck.py index 1662b15c..1a33073a 100644 --- a/mirrors/management/commands/mirrorcheck.py +++ b/mirrors/management/commands/mirrorcheck.py @@ -9,145 +9,255 @@ we encounter errors, record those as well. Usage: ./manage.py mirrorcheck """ -from django.core.management.base import NoArgsCommand -from django.db.models import Q - -from datetime import datetime, timedelta +from collections import deque +from datetime import datetime +from httplib import HTTPException import logging +import os +from optparse import make_option +from pytz import utc import re import socket +import ssl +import subprocess import sys import time -import thread +import tempfile from threading import Thread +import types from Queue import Queue, Empty import urllib2 -from logging import ERROR, WARNING, INFO, DEBUG +from django.core.management.base import NoArgsCommand +from django.db import transaction +from django.utils.timezone import now + +from mirrors.models import MirrorUrl, MirrorLog, CheckLocation -from mirrors.models import Mirror, MirrorUrl, MirrorLog logging.basicConfig( - level=WARNING, + level=logging.WARNING, format='%(asctime)s -> %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', stream=sys.stderr) logger = logging.getLogger() + class Command(NoArgsCommand): + option_list = NoArgsCommand.option_list + ( + make_option('-t', '--timeout', dest='timeout', type='float', default=10.0, + help='Timeout value for connecting to URL'), + make_option('-l', '--location', dest='location', type='int', + help='ID of CheckLocation object to use for this run'), + ) help = "Runs a check on all known mirror URLs to determine their up-to-date status." def handle_noargs(self, **options): v = int(options.get('verbosity', 0)) if v == 0: - logger.level = ERROR + logger.level = logging.ERROR elif v == 1: - logger.level = WARNING - elif v == 2: - logger.level = DEBUG - - import signal, traceback - handler = lambda sig, stack: traceback.print_stack(stack) - signal.signal(signal.SIGQUIT, handler) - signal.signal(signal.SIGUSR1, handler) - - return check_current_mirrors() - -def parse_rfc3339_datetime(time): - # '2010-09-02 11:05:06+02:00' - m = re.match('^(\d{4})-(\d{2})-(\d{2}) (\d{2}):(\d{2}):(\d{2})([-+])(\d{2}):(\d{2})', time) - if m: - vals = m.groups() - parsed = datetime(int(vals[0]), int(vals[1]), int(vals[2]), - int(vals[3]), int(vals[4]), int(vals[5])) - # now account for time zone offset - sign = vals[6] - offset = timedelta(hours=int(sign + vals[7]), - minutes=int(sign + vals[8])) - # subtract the offset, e.g. '-04:00' should be moved up 4 hours - return parsed - offset - return None - -def check_mirror_url(mirror_url): + logger.level = logging.WARNING + elif v >= 2: + logger.level = logging.DEBUG + + timeout = options.get('timeout') + + urls = MirrorUrl.objects.select_related('protocol').filter( + active=True, mirror__active=True, mirror__public=True) + + location = options.get('location', None) + if location: + location = CheckLocation.objects.get(id=location) + family = location.family + monkeypatch_getaddrinfo(family) + if family == socket.AF_INET6: + urls = urls.filter(has_ipv6=True) + elif family == socket.AF_INET: + urls = urls.filter(has_ipv4=True) + + pool = MirrorCheckPool(urls, location, timeout) + pool.run() + return 0 + + +def monkeypatch_getaddrinfo(force_family=socket.AF_INET): + '''Force the Python socket module to connect over the designated family; + e.g. socket.AF_INET or socket.AF_INET6.''' + orig = socket.getaddrinfo + + def wrapper(host, port, family=0, socktype=0, proto=0, flags=0): + return orig(host, port, force_family, socktype, proto, flags) + + socket.getaddrinfo = wrapper + + +def parse_lastsync(log, data): + '''lastsync file should be an epoch value created by us.''' + try: + parsed_time = datetime.utcfromtimestamp(int(data)) + log.last_sync = parsed_time.replace(tzinfo=utc) + except (TypeError, ValueError): + # it is bad news to try logging the lastsync value; + # sometimes we get a crazy-encoded web page. + # if we couldn't parse a time, this is a failure. + log.last_sync = None + log.error = "Could not parse time from lastsync" + log.is_success = False + + +def check_mirror_url(mirror_url, location, timeout): url = mirror_url.url + 'lastsync' - logger.info("checking URL %s" % url) - log = MirrorLog(url=mirror_url, check_time=datetime.utcnow()) + logger.info("checking URL %s", url) + log = MirrorLog(url=mirror_url, check_time=now(), location=location) + headers = {'User-Agent': 'archweb/1.0'} + req = urllib2.Request(url, None, headers) + start = time.time() try: - start = time.time() - result = urllib2.urlopen(url, timeout=10) + result = urllib2.urlopen(req, timeout=timeout) data = result.read() result.close() end = time.time() - # lastsync should be an epoch value, but some mirrors - # are creating their own in RFC-3339 format: - # '2010-09-02 11:05:06+02:00' - try: - parsed_time = datetime.utcfromtimestamp(int(data)) - except ValueError: - # it is bad news to try logging the lastsync value; - # sometimes we get a crazy-encoded web page. - logger.info("attempting to parse generated lastsync file" - " from mirror %s" % url) - parsed_time = parse_rfc3339_datetime(data) - - log.last_sync = parsed_time + parse_lastsync(log, data) log.duration = end - start - logger.debug("success: %s, %.2f" % (url, log.duration)) - except urllib2.HTTPError, e: + logger.debug("success: %s, %.2f", url, log.duration) + except urllib2.HTTPError as e: + if e.code == 404: + # we have a duration, just not a success + end = time.time() + log.duration = end - start + log.is_success = False + log.error = str(e) + logger.debug("failed: %s, %s", url, log.error) + except urllib2.URLError as e: log.is_success = False - log.error =str(e) - logger.debug("failed: %s, %s" % (url, log.error)) - except urllib2.URLError, e: - log.is_success=False log.error = e.reason + if isinstance(e.reason, types.StringTypes) and \ + re.search(r'550.*No such file', e.reason): + # similar to 404 case above, still record duration + end = time.time() + log.duration = end - start if isinstance(e.reason, socket.timeout): log.error = "Connection timed out." elif isinstance(e.reason, socket.error): - log.error = e.reason.args[1] - logger.debug("failed: %s, %s" % (url, log.error)) + log.error = e.reason.args[-1] + logger.debug("failed: %s, %s", url, log.error) + except HTTPException: + # e.g., BadStatusLine + log.is_success = False + log.error = "Exception in processing HTTP request." + logger.debug("failed: %s, %s", url, log.error) + except ssl.CertificateError as e: + log.is_success = False + log.error = str(e) + logger.debug("failed: %s, %s", url, log.error) + except socket.timeout: + log.is_success = False + log.error = "Connection timed out." + logger.debug("failed: %s, %s", url, log.error) + except socket.error as e: + log.is_success = False + log.error = str(e) + logger.debug("failed: %s, %s", url, log.error) - log.save() return log -def mirror_url_worker(queue): + +def check_rsync_url(mirror_url, location, timeout): + url = mirror_url.url + 'lastsync' + logger.info("checking URL %s", url) + log = MirrorLog(url=mirror_url, check_time=now(), location=location) + + tempdir = tempfile.mkdtemp() + ipopt = '' + if location: + if location.family == socket.AF_INET6: + ipopt = '--ipv6' + elif location.family == socket.AF_INET: + ipopt = '--ipv4' + lastsync_path = os.path.join(tempdir, 'lastsync') + rsync_cmd = ["rsync", "--quiet", "--contimeout=%d" % timeout, + "--timeout=%d" % timeout] + if ipopt: + rsync_cmd.append(ipopt) + rsync_cmd.append(url) + rsync_cmd.append(lastsync_path) + try: + with open(os.devnull, 'w') as devnull: + if logger.isEnabledFor(logging.DEBUG): + logger.debug("rsync cmd: %s", ' '.join(rsync_cmd)) + start = time.time() + proc = subprocess.Popen(rsync_cmd, stdout=devnull, + stderr=subprocess.PIPE) + _, errdata = proc.communicate() + end = time.time() + log.duration = end - start + if proc.returncode != 0: + logger.debug("error: %s, %s", url, errdata) + log.is_success = False + log.error = errdata.strip() + # look at rsync error code- if we had a command error or timed out, + # don't record a duration as it is misleading + if proc.returncode in (1, 30, 35): + log.duration = None + else: + logger.debug("success: %s, %.2f", url, log.duration) + if os.path.exists(lastsync_path): + with open(lastsync_path, 'r') as lastsync: + parse_lastsync(log, lastsync.read()) + else: + parse_lastsync(log, None) + finally: + if os.path.exists(lastsync_path): + os.unlink(lastsync_path) + os.rmdir(tempdir) + + return log + + +def mirror_url_worker(work, output, location, timeout): while True: try: - item = queue.get(block=False) - check_mirror_url(item) - queue.task_done() + url = work.get(block=False) + try: + if url.protocol.protocol == 'rsync': + log = check_rsync_url(url, location, timeout) + elif (url.protocol.protocol == 'ftp' and location and + location.family == socket.AF_INET6): + # IPv6 + FTP don't work; skip checking completely + log = None + else: + log = check_mirror_url(url, location, timeout) + if log: + output.append(log) + finally: + work.task_done() except Empty: return 0 + class MirrorCheckPool(object): - def __init__(self, work, num_threads=10): + def __init__(self, urls, location, timeout=10, num_threads=10): self.tasks = Queue() - for i in work: - self.tasks.put(i) + self.logs = deque() + for url in list(urls): + self.tasks.put(url) self.threads = [] - for i in range(num_threads): - thread = Thread(target=mirror_url_worker, args=(self.tasks,)) + for _ in range(num_threads): + thread = Thread(target=mirror_url_worker, + args=(self.tasks, self.logs, location, timeout)) thread.daemon = True self.threads.append(thread) - def run_and_join(self): + @transaction.atomic + def run(self): logger.debug("starting threads") - for t in self.threads: - t.start() + for thread in self.threads: + thread.start() logger.debug("joining on all threads") self.tasks.join() - -def check_current_mirrors(): - urls = MirrorUrl.objects.filter( - Q(protocol__protocol__iexact='HTTP') | - Q(protocol__protocol__iexact='FTP'), - mirror__active=True, mirror__public=True) - - pool = MirrorCheckPool(urls) - pool.run_and_join() - return 0 - -# For lack of a better place to put it, here is a query to get latest check -# result joined with mirror details: -# SELECT mu.*, m.*, ml.* FROM mirrors_mirrorurl mu JOIN mirrors_mirror m ON mu.mirror_id = m.id JOIN mirrors_mirrorlog ml ON mu.id = ml.url_id LEFT JOIN mirrors_mirrorlog ml2 ON ml.url_id = ml2.url_id AND ml.id < ml2.id WHERE ml2.id IS NULL AND m.active = 1 AND m.public = 1; + logger.debug("processing %d log entries", len(self.logs)) + MirrorLog.objects.bulk_create(self.logs) + logger.debug("log entries saved") # vim: set ts=4 sw=4 et: diff --git a/mirrors/management/commands/mirrorresolv.py b/mirrors/management/commands/mirrorresolv.py new file mode 100644 index 00000000..0e71894b --- /dev/null +++ b/mirrors/management/commands/mirrorresolv.py @@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +""" +mirrorresolv command + +Poll all mirror URLs and determine whether they have IPv4 and/or IPv6 addresses +available. + +Usage: ./manage.py mirrorresolv +""" + +from django.core.management.base import NoArgsCommand + +import sys +import logging +import socket + +from mirrors.models import MirrorUrl + +logging.basicConfig( + level=logging.WARNING, + format='%(asctime)s -> %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + stream=sys.stderr) +logger = logging.getLogger() + +class Command(NoArgsCommand): + help = "Runs a check on all active mirror URLs to determine if they are reachable via IPv4 and/or v6." + + def handle_noargs(self, **options): + v = int(options.get('verbosity', 0)) + if v == 0: + logger.level = logging.ERROR + elif v == 1: + logger.level = logging.WARNING + elif v >= 2: + logger.level = logging.DEBUG + + return resolve_mirrors() + +def resolve_mirrors(): + logger.debug("requesting list of mirror URLs") + for mirrorurl in MirrorUrl.objects.filter(active=True, mirror__active=True): + try: + # save old values, we can skip no-op updates this way + oldvals = (mirrorurl.has_ipv4, mirrorurl.has_ipv6) + logger.debug("resolving %3i (%s)", mirrorurl.id, mirrorurl.hostname) + families = mirrorurl.address_families() + mirrorurl.has_ipv4 = socket.AF_INET in families + mirrorurl.has_ipv6 = socket.AF_INET6 in families + logger.debug("%s: v4: %s v6: %s", mirrorurl.hostname, + mirrorurl.has_ipv4, mirrorurl.has_ipv6) + # now check new values, only update if new != old + newvals = (mirrorurl.has_ipv4, mirrorurl.has_ipv6) + if newvals != oldvals: + logger.debug("values changed for %s", mirrorurl) + mirrorurl.save(update_fields=('has_ipv4', 'has_ipv6')) + except socket.gaierror, e: + if e.errno == socket.EAI_NONAME: + logger.debug("gaierror resolving %s: %s", mirrorurl.hostname, e) + else: + logger.warn("gaierror resolving %s: %s", mirrorurl.hostname, e) + except socket.error, e: + logger.warn("error resolving %s: %s", mirrorurl.hostname, e) + +# vim: set ts=4 sw=4 et: |