diff options
Diffstat (limited to 'devel/utils.py')
-rw-r--r-- | devel/utils.py | 164 |
1 files changed, 161 insertions, 3 deletions
diff --git a/devel/utils.py b/devel/utils.py index abfdabe5..e745c7a9 100644 --- a/devel/utils.py +++ b/devel/utils.py @@ -1,12 +1,21 @@ +import re + +from django.conf import settings from django.contrib.auth.models import User +from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned from django.db import connection +from django.db.models import Count, Q +from devel.models import UserProfile from main.utils import cache_function +from main.models import Package from packages.models import PackageRelation -@cache_function(300) +@cache_function(283) def get_annotated_maintainers(): - maintainers = User.objects.filter(is_active=True).order_by( + profile_ids = UserProfile.allowed_repos.through.objects.values('userprofile_id') + maintainers = User.objects.filter( + is_active=True, userprofile__id__in=profile_ids).order_by( 'first_name', 'last_name') # annotate the maintainers with # of maintained and flagged packages @@ -28,10 +37,159 @@ SELECT pr.user_id, COUNT(*), COUNT(p.flag_date) pkg_count[k] = total flag_count[k] = flagged + update_count = Package.objects.values_list('packager').order_by( + 'packager').annotate(Count('packager')) + update_count = dict(update_count) + for m in maintainers: m.package_count = pkg_count.get(m.id, 0) m.flagged_count = flag_count.get(m.id, 0) + m.updated_count = update_count.get(m.id, 0) + + # force non-QS context, otherwise pickling doesn't work + return list(maintainers) + + +def ignore_does_not_exist(func): + def new_func(*args, **kwargs): + try: + return func(*args, **kwargs) + except (ObjectDoesNotExist, MultipleObjectsReturned): + return None + return new_func + + +class UserFinder(object): + def __init__(self): + self.cache = {} + self.username_cache = {} + self.email_cache = {} + self.pgp_cache = {} + + @staticmethod + @ignore_does_not_exist + def user_email(name, email): + if email: + return User.objects.get(email=email) + return None + + @staticmethod + @ignore_does_not_exist + def username_email(name, email): + if email and '@' in email: + # split email addr at '@' symbol, ensure domain matches + # or is a subdomain of parabola.nu + username, domain = email.split('@', 1) + if re.match(settings.DOMAIN_RE, domain): + return User.objects.get(username=username) + return None + + @staticmethod + @ignore_does_not_exist + def profile_email(name, email): + if email: + return User.objects.get(userprofile__public_email=email) + return None + + @staticmethod + @ignore_does_not_exist + def user_name(name, email): + # yes, a bit odd but this is the easiest way since we can't always be + # sure how to split the name. Ensure every 'token' appears in at least + # one of the two name fields. + if not name: + return None + name_q = Q() + for token in name.split(): + # ignore quoted parts; e.g. nicknames in strings + if re.match(r'^[\'"].*[\'"]$', token): + continue + name_q &= (Q(first_name__icontains=token) | + Q(last_name__icontains=token)) + return User.objects.get(name_q) + + def find(self, userstring): + ''' + Attempt to find the corresponding User object for a standard + packager string, e.g. something like + 'A. U. Thor <author@example.com>'. + We start by searching for a matching email address; we then move onto + matching by first/last name. If we cannot find a user, then return None. + ''' + if not userstring: + return None + if userstring in self.cache: + return self.cache[userstring] + + name = email = None + + matches = re.match(r'^([^<]+)? ?<([^>]*)>?', userstring) + if not matches: + name = userstring.strip() + else: + name = matches.group(1) + email = matches.group(2) + + user = None + find_methods = (self.user_email, self.profile_email, + self.username_email, self.user_name) + for matcher in find_methods: + user = matcher(name, email) + if user is not None: + break + + self.cache[userstring] = user + self.email_cache[email] = user + return user + + def find_by_username(self, username): + if not username: + return None + if username in self.username_cache: + return self.username_cache[username] + + try: + user = User.objects.get(username=username) + except User.DoesNotExist: + user = None + + self.username_cache[username] = user + return user + + def find_by_email(self, email): + if not email: + return None + if email in self.email_cache: + return self.email_cache[email] + + user = self.user_email(None, email) + if user is None: + user = self.profile_email(None, email) + if user is None: + user = self.username_email(None, email) + + self.email_cache[email] = user + return user + + def find_by_pgp_key(self, pgp_key): + if not pgp_key: + return None + if pgp_key in self.pgp_cache: + return self.pgp_cache[pgp_key] + + try: + user = User.objects.get( + userprofile__pgp_key__endswith=pgp_key) + except User.DoesNotExist: + user = None + + self.pgp_cache[pgp_key] = user + return user - return maintainers + def clear_cache(self): + self.cache = {} + self.username_cache = {} + self.email_cache = {} + self.pgp_cache = {} # vim: set ts=4 sw=4 et: |