diff options
-rw-r--r-- | config.py | 3 | ||||
-rw-r--r-- | filter.py | 165 | ||||
-rw-r--r-- | pato2.py | 339 | ||||
-rw-r--r-- | test/blacklist_sample | 2 | ||||
-rw-r--r-- | test/core.db.tar.gz | bin | 0 -> 1345 bytes | |||
-rw-r--r-- | test/depends | 4 | ||||
-rw-r--r-- | test/desc | 39 | ||||
-rw-r--r-- | test/rsync_output_sample | 14 | ||||
-rw-r--r-- | test/test1.py | 66 | ||||
-rw-r--r-- | test/test_filter.py | 192 |
10 files changed, 547 insertions, 277 deletions
@@ -32,6 +32,9 @@ for var in boolvars: else: print('%s is not True or False' % var) +# Rsync commands +rsync_list_command="rsync -a --no-motd --list-only " + # Classes and Exceptions class NonValidFile(ValueError): pass class NonValidDir(ValueError): pass @@ -1,56 +1,66 @@ -#! /usr/bin/python + #! /usr/bin/python #-*- encoding: utf-8 -*- -import commands -import os -import re +from glob import glob from repm.config import * from repm.pato2 import * -rsync_list_command="rsync -a --no-motd --list-only " +def pkginfo_from_filename(filename): + """ Generates a Package object with info from a filename, + filename can be relative or absolute + + Parameters: + ---------- + filename -> str Must contain .pkg.tar. -def generate_rsync_command(base_command, dir_list, destdir=repodir, mirror_name=mirror, - mirror_path=mirrorpath, blacklist_file=False): - """ Generates an rsync command for executing it by combining all parameters. + Returns: + ---------- + pkg -> Package object""" + if ".pkg.tar." not in filename: + raise NonValidFile + pkg = Package() + pkg["location"] = filename + fileattrs = os.path.basename(filename).split("-") + pkg["arch"] = fileattrs.pop(-1).split(".")[0] + pkg["release"] = fileattrs.pop(-1) + pkg["version"] = fileattrs.pop(-1) + pkg["name"] = "-".join(fileattrs) + return pkg + +def pkginfo_from_desc(filename): + """ Returns pkginfo from desc file. Parameters: ---------- - base_command -> str - mirror_name -> str - mirror_path -> str - dir_list -> list or tuple - destdir -> str Path to dir, dir must exist. - blacklist_file -> False or str Path to file, file must exist. + filename -> str File must exist - Return: + Returns: ---------- - rsync_command -> str """ - from os.path import isfile, isdir - - if blacklist_file and not isfile(blacklist_file): - print(blacklist_file + " is not a file") + pkg -> Package object""" + if not os.path.isfile(filename): raise NonValidFile - - if not os.path.isdir(destdir): - print(destdir + " is not a directory") - raise NonValidDir - - dir_list="{" + ",".join(dir_list) + "}" - - if blacklist_file: - return " ".join((base_command, "--exclude-from-file="+blacklist_file, - mirror_name + mirror_path + dir_list, destdir)) - return " ".join((base_command, mirror_name + mirror_path + dir_list, destdir)) - -def run_rsync(base_for_rsync=rsync_list_command, dir_list_for_rsync=(repo_list + dir_list), - debug=verbose): - """ Runs rsync and gets returns it's output """ - cmd = str(generate_rsync_command(rsync_list_command, (repo_list + dir_list))) - if debug: - printf("rsync_command" + cmd) - return commands.getoutput(cmd) - -def get_file_list_from_rsync_output(rsync_output): - """ Generates a list of packages and versions from an rsync output using --list-only --no-motd. + try: + f=open(filename) + info=f.read().rsplit() + finally: + f.close() + pkg = Package() + info_map={"name" :("%NAME%" , None), + "version" :("%VERSION%" , 0 ), + "release" :("%VERSION%" , 1 ), + "arch" :("%ARCH%" , None), + "license" :("%LICENSE%" , None), + "location":("%FILENAME%", None),} + + for key in info_map.keys(): + field,pos=info_map[key] + pkg[key]=info[info.index(field)+1] + if pos is not None: + pkg[key]=pkg[key].split("-")[pos] + return pkg + +def pkginfo_from_rsync_output(rsync_output): + """ Generates a list of packages and versions from an rsync output + wich uses --list-only and --no-motd options. Parameters: ---------- @@ -59,34 +69,54 @@ def get_file_list_from_rsync_output(rsync_output): Returns: ---------- package_list -> tuple Contains Package objects. """ - a=list() - - def directory(line): - pass def package_or_link(line): """ Take info out of filename """ location_field = 4 - pkg = Package() - pkg["location"] = line.rsplit()[location_field] - fileattrs = pkg["location"].split("/")[-1].split("-") - pkg["arch"] = fileattrs.pop(-1).split(".")[0] - pkg["release"] = fileattrs.pop(-1) - pkg["version"] = fileattrs.pop(-1) - pkg["name"] = "-".join(fileattrs) - return pkg - - options = { "d": directory, + return pkginfo_from_filename(line.rsplit()[location_field]) + + def do_nothing(): + pass + + options = { "d": do_nothing, "l": package_or_link, - "-": package_or_link} + "-": package_or_link, + " ": do_nothing} + + package_list=list() + + lines=[x for x in rsync_output.split("\n") if ".pkg.tar" in x] + + for line in lines: + pkginfo=options[line[0]](line) + if pkginfo: + package_list.append(pkginfo) + + return tuple(package_list) + +def pkginfo_from_files_in_dir(directory): + """ Returns pkginfo from filenames of packages in dir + wich has .pkg.tar. on them + + Parameters: + ---------- + directory -> str Directory must exist - for line in rsync_output.split("\n"): - if ".pkg.tar" in line: - pkginfo=options[line[0]](line) - if pkginfo: - a.append(pkginfo) + Returns: + ---------- + package_list -> tuple Contains Package objects """ + package_list=list() - return tuple(a) + if not os.path.isdir(directory): + raise NonValidDir + + for filename in glob(os.path.join(directory,"*")): + if ".pkg.tar." in filename: + package_list.append(pkginfo_from_filename(filename)) + return tuple(package_list) + +def pkginfo_from_db(path_to_db): + """ """ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names, exclude_file=rsync_blacklist, debug=verbose): @@ -111,8 +141,7 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names, a.append(package["location"]) if debug: - printf(a) - + return a try: fsock = open(exclude_file,"w") try: @@ -121,4 +150,8 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names, fsock.close() except IOError: printf("%s wasnt written" % blacklist_file) - + +if __name__ == "__main__": + a=run_rsync(rsync_list_command) + packages=pkginfo_from_rsync_output(a) + generate_exclude_list_from_blacklist(packages,listado(blacklist)) @@ -25,175 +25,224 @@ """ from repm.config import * - +from repm.filter import * import tarfile from glob import glob from os.path import isdir, isfile def printf(text,output_=output): - """Guarda el texto en la variable log y puede imprimir en pantalla.""" - log_file = open(logname, 'a') - log_file.write("\n" + str(text) + "\n") - log_file.close() - if output_: print (str(text) + "\n") + """Guarda el texto en la variable log y puede imprimir en pantalla.""" + log_file = open(logname, 'a') + log_file.write("\n" + str(text) + "\n") + log_file.close() + if output_: print (str(text) + "\n") def listado(filename_): - """Obtiene una lista de paquetes de un archivo.""" - archivo = open(filename_,"r") - lista = archivo.read().split("\n") - archivo.close() - return [pkg.split(":")[0] for pkg in lista if pkg] + """Obtiene una lista de paquetes de un archivo.""" + archivo = open(filename_,"r") + lista = archivo.read().split("\n") + archivo.close() + return [pkg.split(":")[0].rstrip() for pkg in lista if pkg] def db(repo_,arch_): - """Construye un nombre para sincronizar una base de datos.""" - return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_) + """Construye un nombre para sincronizar una base de datos.""" + return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_) def packages(repo_, arch_, expr="*"): - """ Get packages on a repo, arch folder """ - return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) ) - -def sync_all_repo(verbose_=verbose): - folders = ",".join(repo_list + dir_list) - cmd_ = "rsync -av --delete-after --delay-updates " + mirror + mirrorpath + "/{" + folders + "} " + repodir - printf(cmd_) - a=commands.getoutput(cmd_) - if verbose_: printf(a) + """ Get packages on a repo, arch folder """ + return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) ) + +def sync_all_repo(debug=verbose): + cmd=generate_rsync_command(rsync_list_command) + rsout=run_rsync(cmd) + pkgs=pkginfo_from_rsync_output(rsout) + generate_exclude_list_from_blacklist(pkgs,listado(blacklist),debug=False) + cmd=generate_rsync_command(rsync_update_command,blacklist_file=rsync_blacklist) + a=run_rsync(cmd) + cmd=generate_rsync_command(rsync_post_command,blacklist_file=rsync_blacklist) + b=run_rsync(cmd) + if debug: + printf(a) + printf(b) def get_from_desc(desc, var,db_tar_file=False): - """ Get a var from desc file """ - desc = desc.split("\n") - return desc[desc.index(var)+1] + """ Get a var from desc file """ + desc = desc.split("\n") + return desc[desc.index(var)+1] def get_info(repo_,arch_,db_tar_file=False,verbose_=verbose): - """ Makes a list of package name, file and license """ - info=list() - # Extract DB tar.gz - commands.getoutput("mkdir -p " + archdb) - if not db_tar_file: - db_tar_file = repodir + db(repo_,arch_) - if isfile(db_tar_file): - try: - db_open_tar = tarfile.open(db_tar_file, 'r:gz') - except tarfile.ReadError: - printf("No valid db_file %s" % db_tar_file) - return(tuple()) - else: - printf("No db_file %s" % db_tar_file) - return(tuple()) - for file in db_open_tar.getmembers(): - db_open_tar.extract(file, archdb) - db_open_tar.close() - # Get info from file - for dir_ in glob(archdb + "/*"): - if isdir(dir_) and isfile(dir_ + "/desc"): - pkg_desc_file = open(dir_ + "/desc", "r") - desc = pkg_desc_file.read() - pkg_desc_file.close() - info.append(( get_from_desc(desc,"%NAME%"), - dir_.split("/")[-1], - get_from_desc(desc,"%LICENSE%") )) - if verbose_: printf(info) - commands.getoutput("rm -r %s/*" % archdb) - return tuple(info) + """ Makes a list of package name, file and license """ + info=list() + # Extract DB tar.gz + commands.getoutput("mkdir -p " + archdb) + if not db_tar_file: + db_tar_file = repodir + db(repo_,arch_) + if isfile(db_tar_file): + try: + db_open_tar = tarfile.open(db_tar_file, 'r:gz') + except tarfile.ReadError: + printf("No valid db_file %s" % db_tar_file) + return(tuple()) + else: + printf("No db_file %s" % db_tar_file) + return(tuple()) + for file in db_open_tar.getmembers(): + db_open_tar.extract(file, archdb) + db_open_tar.close() + # Get info from file + for dir_ in glob(archdb + "/*"): + if isdir(dir_) and isfile(dir_ + "/desc"): + pkg_desc_file = open(dir_ + "/desc", "r") + desc = pkg_desc_file.read() + pkg_desc_file.close() + info.append(( get_from_desc(desc,"%NAME%"), + dir_.split("/")[-1], + get_from_desc(desc,"%LICENSE%") )) + if verbose_: printf(info) + commands.getoutput("rm -r %s/*" % archdb) + return tuple(info) def make_pending(repo_,arch_,info_): - """ Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending""" - search = tuple( listado(blacklist) + listado (whitelist) ) - if verbose: printf("blaclist + whitelist= " + str(search) ) - lista_=list() - for (name,pkg_,license_) in info_: - if "custom" in license_: - if name not in search: - lista_.append( (name, license_ ) ) - elif not name: - printf( pkg_ + " package has no %NAME% attibute " ) - if verbose: printf( lista_ ) - a=open( pending + "-" + repo_ + ".txt", "w" ).write( - "\n".join([name + ":" + license_ for (name,license_) in lista_]) ) + """ Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending""" + search = tuple( listado(blacklist) + listado (whitelist) ) + if verbose: printf("blaclist + whitelist= " + str(search) ) + lista_=list() + for (name,pkg_,license_) in info_: + if "custom" in license_: + if name not in search: + lista_.append( (name, license_ ) ) + elif not name: + printf( pkg_ + " package has no %NAME% attibute " ) + if verbose: printf( lista_ ) + a=open( pending + "-" + repo_ + ".txt", "w" ).write( + "\n".join([name + ":" + license_ for (name,license_) in lista_]) + "\n") def remove_from_blacklist(repo_,arch_,info_,blacklist_): - """ Check the blacklist and remove packages on the db""" - lista_=list() - pack_=list() - for (name_, pkg_, license_) in info_: - if name_ in blacklist_: - lista_.append(name_) - for p in packages(repo_,arch_,pkg_ + "*"): - pack_.append(p) - if lista_: - lista_=" ".join(lista_) - com_ = "repo-remove " + repodir + db(repo_,arch_) + " " + lista_ - printf(com_) - a = commands.getoutput(com_) - if verbose: printf(a) - if pack_: - pack_=" ".join(pack_) - com_="chmod a-r " + pack_ - printf(com_) - a=commands.getoutput(com_) - if verbose: printf(a) + """ Check the blacklist and remove packages on the db""" + lista_=list() + pack_=list() + for (name_, pkg_, license_) in info_: + if name_ in blacklist_: + lista_.append(name_) + for p in packages(repo_,arch_,pkg_ + "*"): + pack_.append(p) + if lista_: + lista_=" ".join(lista_) + com_ = "repo-remove " + repodir + db(repo_,arch_) + " " + lista_ + printf(com_) + a = commands.getoutput(com_) + if verbose: printf(a) + +def cleanup_nonfree_in_dir(directory,blacklisted_names): + pkgs=pkginfo_from_files_in_dir(directory) + for package in pkgs: + if package["name"] in blacklisted_names: + os.remove(package["location"]) def link(repo_,arch_,file_): - """ Makes a link in the repo for the package """ - cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_ - a=commands.getoutput(cmd_) - if verbose: - printf(cmd_ + a) + """ Makes a link in the repo for the package """ + cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_ + a=commands.getoutput(cmd_) + if verbose: + printf(cmd_ + a) def add_free_repo(verbose_=verbose): - for repo_ in repo_list: - for arch_ in arch_list: - lista_=list() - for file_ in glob(free_path + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"): - lista_.append(file_) - link(repo_,arch_,file_) - for dir_ in other: - for file_ in glob(free_path + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"): - lista_.append(file_) - link(repo_,arch_,file_) - if lista_: - lista_=" ".join(lista_) - if verbose: printf(lista_) - cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_ - printf(cmd_) - a=commands.getoutput(cmd_) - if verbose: printf(a) + cmd_=os.path.join(home,"/usr/bin/sync-free") + printf(cmd_) + a=commands.getoutput(cmd_) + if verbose_: printf(a) + for repo_ in repo_list: + for arch_ in arch_list: + lista_=list() + for file_ in glob(freedir + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"): + lista_.append(file_) + for dir_ in other: + for file_ in glob(freedir + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"): + lista_.append(file_) + + printf(lista_) + + if lista_: + lista_=" ".join(lista_) + if verbose: printf(lista_) + cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_ + printf(cmd_) + a=commands.getoutput(cmd_) + if verbose: printf(a) def get_licenses(verbose_=verbose): - """ Extract the license from packages in repo_,arch_ and in pending_ file""" - cmd_=home + "/usr/bin/get_license.sh" - printf(cmd_) - a=commands.getoutput(cmd_) - if verbose_: printf(a) + """ Extract the license from packages in repo_,arch_ and in pending_ file""" + cmd_=home + "/usr/bin/get_license.sh" + printf(cmd_) + a=commands.getoutput(cmd_) + if verbose_: printf(a) + +def generate_rsync_command(base_command, dir_list=(repo_list + dir_list), destdir=repodir, + source=mirror+mirrorpath, blacklist_file=False): + """ Generates an rsync command for executing it by combining all parameters. + + Parameters: + ---------- + base_command -> str + dir_list -> list or tuple + destdir -> str Path to dir, dir must exist. + source -> str The source for rsync + blacklist_file -> False or str Path to file, file must exist. + + Return: + ---------- + rsync_command -> str """ + from os.path import isfile, isdir + + if blacklist_file and not isfile(blacklist_file): + print(blacklist_file + " is not a file") + raise NonValidFile + + if not os.path.isdir(destdir): + print(destdir + " is not a directory") + raise NonValidDir + + dir_list="{" + ",".join(dir_list) + "}" + + if blacklist_file: + return " ".join((base_command, "--exclude-from="+blacklist_file, + os.path.join(source, dir_list), destdir)) + return " ".join((base_command, os.path.join(source, dir_list), destdir)) + +def run_rsync(command,debug=verbose): + """ Runs rsync and gets returns it's output """ + if debug: + printf("rsync_command: " + command) + return commands.getoutput(command) if __name__ == "__main__": - from time import time - start_time = time() - def minute(): - return str(round((time() - start_time)/60, 1)) - - printf(" Cleaning %s folder " % (tmp) ) - commands.getoutput("rm -r %s/*" % tmp) - printf(" Syncing repo") - sync_all_repo(True) - - printf(" Updating databases and pending files lists: minute %s \n" % minute() ) - for repo in repo_list: - for arch in arch_list: - printf( "\n" + repo + "-" + arch + "\n" ) - printf( "Get info: minute %s " % minute() ) - info=get_info(repo,arch) - printf( "Make pending: minute %s" % minute() ) - make_pending(repo,arch,info) - printf( "Update DB: minute %s" % minute() ) - remove_from_blacklist( - repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) ) - - printf("Adding Parabola Packages: minute %s\n" % minute() ) - add_free_repo(True) - - printf("Extracting licenses in pending: minute %s" % minute() ) - get_licenses() - - printf("\n\nDelay: %s minutes \n" % minute()) - + from time import time + start_time = time() + def minute(): + return str(round((time() - start_time)/60, 1)) + + printf(" Cleaning %s folder " % (tmp) ) + commands.getoutput("rm -r %s/*" % tmp) + printf(" Syncing repo") + sync_all_repo(True) + + printf(" Updating databases and pending files lists: minute %s \n" % minute() ) + for repo in repo_list: + for arch in arch_list: + printf( "\n" + repo + "-" + arch + "\n" ) + printf( "Get info: minute %s " % minute() ) + info=get_info(repo,arch) + printf( "Make pending: minute %s" % minute() ) + make_pending(repo,arch,info) + printf( "Update DB: minute %s" % minute() ) + remove_from_blacklist( + repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) ) + + printf("Adding Parabola Packages: minute %s\n" % minute() ) + add_free_repo(True) + + printf("Extracting licenses in pending: minute %s" % minute() ) + get_licenses() + + printf("\n\nDelay: %s minutes \n" % minute()) + diff --git a/test/blacklist_sample b/test/blacklist_sample new file mode 100644 index 0000000..2a02af6 --- /dev/null +++ b/test/blacklist_sample @@ -0,0 +1,2 @@ +alex:alex-libre: Aquí va un comentario +gmime22 ::Non free dependencies
\ No newline at end of file diff --git a/test/core.db.tar.gz b/test/core.db.tar.gz Binary files differnew file mode 100644 index 0000000..5eb2081 --- /dev/null +++ b/test/core.db.tar.gz diff --git a/test/depends b/test/depends new file mode 100644 index 0000000..7ff3ad4 --- /dev/null +++ b/test/depends @@ -0,0 +1,4 @@ +%DEPENDS% +glibc>=2.13 +zlib + diff --git a/test/desc b/test/desc new file mode 100644 index 0000000..abba644 --- /dev/null +++ b/test/desc @@ -0,0 +1,39 @@ +%FILENAME% +binutils-2.21-4-x86_64.pkg.tar.xz + +%NAME% +binutils + +%VERSION% +2.21-4 + +%DESC% +A set of programs to assemble and manipulate binary and object files + +%GROUPS% +base + +%CSIZE% +3412892 + +%ISIZE% +17571840 + +%MD5SUM% +4e666f87c78998f4839f33dc06d2043a + +%URL% +http://www.gnu.org/software/binutils/ + +%LICENSE% +GPL + +%ARCH% +x86_64 + +%BUILDDATE% +1297240369 + +%PACKAGER% +Allan McRae <allan@archlinux.org> + diff --git a/test/rsync_output_sample b/test/rsync_output_sample new file mode 100644 index 0000000..72d9cd0 --- /dev/null +++ b/test/rsync_output_sample @@ -0,0 +1,14 @@ +dr-xr-sr-x 4096 2010/09/11 11:37:10 . +-rw-r--r-- 11 2011/02/08 00:00:01 lastsync +drwxrwxr-x 15 2010/09/11 11:28:50 community-staging +drwxrwxr-x 30 2010/09/11 11:28:50 community-staging/os +drwxrwxr-x 8192 2011/02/07 17:00:01 community-staging/os/i686 +lrwxrwxrwx 52 2010/12/23 16:51:01 community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz -> ../../../pool/community/alex-2.3.4-1-i686.pkg.tar.xz +lrwxrwxrwx 27 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db -> community-staging.db.tar.gz +-rw-rw-r-- 2237 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db.tar.gz +-rw-rw-r-- 3209 2011/02/07 14:00:13 community-staging/os/i686/community-staging.db.tar.gz.old +drwxrwxr-x 15 2009/07/22 15:07:56 community +drwxrwxr-x 40 2009/08/04 15:57:42 community/os +drwxrwsr-x 36864 2011/02/03 05:00:01 community/os/any +-rw-rw-r-- 303336 2010/07/16 10:06:28 community/os/any/any2dvd-0.34-4-any.pkg.tar.xz +-rw-rw-r-- 221664 2010/03/28 15:55:48 community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz diff --git a/test/test1.py b/test/test1.py deleted file mode 100644 index 13b5660..0000000 --- a/test/test1.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- encoding: utf-8 -*- -""" """ - -__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>" -__version__ = "$Revision: 1.1 $" -__date__ = "$Date: 2011/02/08 $" -__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández" -__license__ = "GPL3+" - -from repm.config import * -from repm.filter import * -import unittest - -class KnownValues(unittest.TestCase): - directory_list=("drwxrwxr-x 15 2010/09/11 11:28:50 community-staging", - "drwxrwxr-x 30 2010/09/11 11:28:50 community-staging/os", - 'dr-xr-sr-x 4096 2010/09/11 11:37:10 .') - # (rsync_out, name, version, arch, release, location) - examples=( - ("lrwxrwxrwx 53 2011/01/31 01:52:06 community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz -> ../../../pool/community/apvlv-0.1.0-2-i686.pkg.tar.xz", "apvlv","0.1.0","i686", "2", "community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz"), - ("lrwxrwxrwx 56 2011/02/04 14:34:08 community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz -> ../../../pool/community/calibre-0.7.44-2-i686.pkg.tar.xz","calibre","0.7.44","i686", "2", "community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz"), - ("-rw-rw-r-- 5846249 2010/11/13 10:54:25 pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz", - "abuse","0.7.1","x86_64","1","pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz"), - ("-rw-rw-r-- 982768 2011/02/05 14:38:17 pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz", - "acetoneiso2","2.3","i686", "2", "pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz"), - ("-rw-rw-r-- 982764 2011/02/05 14:38:40 pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz", - "acetoneiso2","2.3","x86_64","2","pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz") - ) - - def generate_results(self, example_tuple, attr): - rsync_out, name, version, arch, release, location = example_tuple - return get_file_list_from_rsync_output(rsync_out)[0][attr], locals()[attr] - - def testDirectoryOutput(self): - """get_file_list_from_rsync_output should ignore directories""" - rsync_out="\n".join(self.directory_list) - result=get_file_list_from_rsync_output(rsync_out) - self.assertEqual(tuple(), result) - - def testNames(self): - for i in self.examples: - k,v = self.generate_results(example_tuple=i,attr="name") - self.assertEqual(k, v) - - def testVersions(self): - for i in self.examples: - k,v = self.generate_results(example_tuple=i,attr="version") - self.assertEqual(k, v) - - def testArchs(self): - for i in self.examples: - k,v = self.generate_results(example_tuple=i,attr="arch") - self.assertEqual(k, v) - - def testReleases(self): - for i in self.examples: - k,v = self.generate_results(example_tuple=i,attr="release") - self.assertEqual(k, v) - - def testLocations(self): - for i in self.examples: - k,v = self.generate_results(example_tuple=i,attr="location") - self.assertEqual(k, v) - -if __name__ == "__main__": - unittest.main() diff --git a/test/test_filter.py b/test/test_filter.py new file mode 100644 index 0000000..1906b87 --- /dev/null +++ b/test/test_filter.py @@ -0,0 +1,192 @@ +# -*- encoding: utf-8 -*- +""" """ + +__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>" +__version__ = "$Revision: 1.1 $" +__date__ = "$Date: 2011/02/08 $" +__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández" +__license__ = "GPL3+" + +from repm.config import * +from repm.filter import * +import unittest + +class pkginfo_from_file_KnownValues(unittest.TestCase): + # (filename, name, version, release, arch) + # filename is location + known=( + ("community-testing/os/i686/inputattach-1.24-3-i686.pkg.tar.xz","inputattach","1.24","3","i686"), + ("community-testing/os/i686/ngspice-22-1-i686.pkg.tar.xz","ngspice","22","1","i686"), + ("community-testing/os/i686/tmux-1.4-2-i686.pkg.tar.xz","tmux","1.4","2","i686"), + ("community-testing/os/i686/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"), + ("../../../pool/community/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"), + ("community-testing/os/x86_64/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"), + ("../../../pool/community/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"), + ("tor-0.2.1.29-2-x86_64.pkg.tar.xz","tor","0.2.1.29","2","x86_64"), + ) + + def generate_results(self, example_tuple, attr): + location, name, version, release, arch = example_tuple + return pkginfo_from_filename(location)[attr], locals()[attr] + + def testReturnPackageObject(self): + for i in self.known: + location, name, version, release, arch = i + self.assertIsInstance(pkginfo_from_filename(location),Package) + + def testNames(self): + for i in self.known: + k,v = self.generate_results(example_tuple=i,attr="name") + self.assertEqual(k, v) + + def testVersions(self): + for i in self.known: + k,v = self.generate_results(example_tuple=i,attr="version") + self.assertEqual(k, v) + + def testArchs(self): + for i in self.known: + k,v = self.generate_results(example_tuple=i,attr="arch") + self.assertEqual(k, v) + + def testReleases(self): + for i in self.known: + k,v = self.generate_results(example_tuple=i,attr="release") + self.assertEqual(k, v) + + def testLocations(self): + for i in self.known: + k,v = self.generate_results(example_tuple=i,attr="location") + self.assertEqual(k, v) + +class pkginfo_from_file_BadInput(unittest.TestCase): + bad=("community-testing/os/i686/community-testing.db", + "community-testing/os/i686/community-testing.db.tar.gz", + "community-testing/os/i686/community-testing.db.tar.gz.old", + "community-testing/os/i686/community-testing.files", + "community-testing/os/i686/community-testing.files.tar.gz", + "community-testing/os/x86_64") + + def testBadInput(self): + for i in self.bad: + self.assertRaises(NonValidFile,pkginfo_from_filename,i) + +class pkginfoFromRsyncOutput(unittest.TestCase): + example_package_list=(Package(),Package(),Package()) + example_package_list[0].package_info={ "name" : "alex", + "version" : "2.3.4", + "release" : "1", + "arch" : "i686", + "license" : False, + "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz", + "depends" : False,} + example_package_list[1].package_info={ "name" : "any2dvd", + "version" : "0.34", + "release" : "4", + "arch" : "any", + "license" : False, + "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz", + "depends" : False,} + example_package_list[2].package_info={ "name" : "gmime22", + "version" : "2.2.26", + "release" : "1", + "arch" : "x86_64", + "license" : False, + "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz", + "depends" : False,} + + try: + output_file = open("rsync_output_sample") + rsync_out= output_file.read() + output_file.close() + except IOError: print("There is no rsync_output_sample file") + + pkglist = pkginfo_from_rsync_output(rsync_out) + + def testOutputArePackages(self): + if not self.pkglist: + self.fail("not pkglist:" + str(self.pkglist)) + for pkg in self.pkglist: + self.assertIsInstance(pkg,Package) + + def testPackageInfo(self): + if not self.pkglist: + self.fail("Pkglist doesn't exist: " + str(self.pkglist)) + self.assertEqual(self.pkglist,self.example_package_list) + +class generateRsyncBlacklist(unittest.TestCase): + example_package_list=(Package(),Package(),Package()) + example_package_list[0].package_info={ "name" : "alex", + "version" : "2.3.4", + "release" : "1", + "arch" : "i686", + "license" : False, + "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz", + "depends" : False,} + example_package_list[1].package_info={ "name" : "any2dvd", + "version" : "0.34", + "release" : "4", + "arch" : "any", + "license" : False, + "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz", + "depends" : False,} + example_package_list[2].package_info={ "name" : "gmime22", + "version" : "2.2.26", + "release" : "1", + "arch" : "x86_64", + "license" : False, + "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz", + "depends" : False,} + + def testListado(self): + self.assertEqual(listado("blacklist_sample"),["alex","gmime22"]) + + def testExcludeFiles(self): + a=generate_exclude_list_from_blacklist(self.example_package_list,listado("blacklist_sample"),debug=True) + b=[self.example_package_list[0]["location"],self.example_package_list[2]["location"]] + self.assertEqual(a,b) + +class pkginfo_from_descKnownValues(unittest.TestCase): + pkgsample=Package() + pkgsample.package_info={"name" : "binutils", + "version" : "2.21", + "release" : "4", + "arch" : "x86_64", + "license" : "GPL", + "location": "binutils-2.21-4-x86_64.pkg.tar.xz", + "depends" : False,} + pkggen=pkginfo_from_desc("desc") + def testPkginfoFromDesc(self): + if self.pkggen is None: + self.fail("return value is None") + self.assertEqual(self.pkgsample,self.pkggen) + +class pkginfo_from_db(unittest.TestCase): + archdb = os.path.join("./workdir") + example_package_list=(Package(),Package(),Package()) + example_package_list[0].package_info={ "name" : "acl", + "version" : "2.2.49", + "release" : "2", + "arch" : "x86_64", + "license" : ("LGPL",), + "location": "acl-2.2.49-2-x86_64.pkg.tar.xz" + "depends" : ("attr>=2.4.41"),} + example_package_list[1].package_info={ "name" : "glibc", + "version" : "2.13", + "release" : "4", + "arch" : "x86_64", + "license" : ("GPL","LGPL"), + "location": "glibc-2.13-4-x86_64.pkg.tar.xz" + "depends" : ("linux-api-headers>=2.6.37","tzdata",),} + example_package_list[2].package_info={ "name" : "", + "version" : "2.2.26", + "release" : "1", + "arch" : "x86_64", + "license" : False, + "location": "" + "depends" : False,} + + +if __name__ == "__main__": + unittest.main() + |