diff options
| -rw-r--r-- | config.py | 3 | ||||
| -rw-r--r-- | filter.py | 165 | ||||
| -rw-r--r-- | pato2.py | 339 | ||||
| -rw-r--r-- | test/blacklist_sample | 2 | ||||
| -rw-r--r-- | test/core.db.tar.gz | bin | 0 -> 1345 bytes | |||
| -rw-r--r-- | test/depends | 4 | ||||
| -rw-r--r-- | test/desc | 39 | ||||
| -rw-r--r-- | test/rsync_output_sample | 14 | ||||
| -rw-r--r-- | test/test1.py | 66 | ||||
| -rw-r--r-- | test/test_filter.py | 192 | 
10 files changed, 547 insertions, 277 deletions
| @@ -32,6 +32,9 @@ for var in boolvars:      else:          print('%s is not True or False' % var) +# Rsync commands +rsync_list_command="rsync -a --no-motd --list-only " +  # Classes and Exceptions  class NonValidFile(ValueError): pass  class NonValidDir(ValueError): pass @@ -1,56 +1,66 @@ -#! /usr/bin/python + #! /usr/bin/python  #-*- encoding: utf-8 -*- -import commands -import os -import re +from glob import glob  from repm.config import *  from repm.pato2 import * -rsync_list_command="rsync -a --no-motd --list-only " +def pkginfo_from_filename(filename): +    """ Generates a Package object with info from a filename, +    filename can be relative or absolute  +     +    Parameters: +    ---------- +    filename -> str         Must contain .pkg.tar. -def generate_rsync_command(base_command, dir_list, destdir=repodir, mirror_name=mirror, -                           mirror_path=mirrorpath, blacklist_file=False): -    """ Generates an rsync command for executing it by combining all parameters. +    Returns: +    ---------- +    pkg -> Package object""" +    if ".pkg.tar." not in filename: +        raise NonValidFile +    pkg = Package() +    pkg["location"] = filename +    fileattrs = os.path.basename(filename).split("-") +    pkg["arch"] = fileattrs.pop(-1).split(".")[0] +    pkg["release"] = fileattrs.pop(-1) +    pkg["version"] = fileattrs.pop(-1) +    pkg["name"] = "-".join(fileattrs) +    return pkg + +def pkginfo_from_desc(filename): +    """ Returns pkginfo from desc file.      Parameters:      ---------- -    base_command   -> str -    mirror_name    -> str -    mirror_path    -> str -    dir_list       -> list or tuple -    destdir        -> str                  Path to dir, dir must exist. -    blacklist_file -> False or str         Path to file, file must exist. +    filename -> str          File must exist -    Return: +    Returns:      ---------- -    rsync_command -> str """ -    from os.path import isfile, isdir - -    if blacklist_file and not isfile(blacklist_file): -        print(blacklist_file + " is not a file") +    pkg -> Package object""" +    if not os.path.isfile(filename):          raise NonValidFile - -    if not os.path.isdir(destdir): -        print(destdir + " is not a directory") -        raise NonValidDir - -    dir_list="{" + ",".join(dir_list) + "}" - -    if blacklist_file: -        return " ".join((base_command, "--exclude-from-file="+blacklist_file, -                        mirror_name + mirror_path + dir_list, destdir)) -    return " ".join((base_command, mirror_name + mirror_path + dir_list, destdir)) - -def run_rsync(base_for_rsync=rsync_list_command, dir_list_for_rsync=(repo_list + dir_list), -              debug=verbose): -    """ Runs rsync and gets returns it's output """ -    cmd = str(generate_rsync_command(rsync_list_command, (repo_list + dir_list))) -    if debug: -        printf("rsync_command" + cmd) -    return commands.getoutput(cmd) - -def get_file_list_from_rsync_output(rsync_output): -    """ Generates a list of packages and versions from an rsync output using --list-only --no-motd. +    try: +        f=open(filename) +        info=f.read().rsplit() +    finally: +        f.close() +    pkg = Package() +    info_map={"name"    :("%NAME%"    , None), +              "version" :("%VERSION%" , 0    ), +              "release" :("%VERSION%" , 1    ), +              "arch"    :("%ARCH%"    , None), +              "license" :("%LICENSE%" , None), +              "location":("%FILENAME%", None),} + +    for key in info_map.keys(): +        field,pos=info_map[key] +        pkg[key]=info[info.index(field)+1] +        if pos is not None: +            pkg[key]=pkg[key].split("-")[pos] +    return pkg + +def pkginfo_from_rsync_output(rsync_output): +    """ Generates a list of packages and versions from an rsync output +    wich uses --list-only and --no-motd options.      Parameters:      ---------- @@ -59,34 +69,54 @@ def get_file_list_from_rsync_output(rsync_output):      Returns:      ----------      package_list -> tuple        Contains Package objects. """ -    a=list() - -    def directory(line): -        pass      def package_or_link(line):          """ Take info out of filename """          location_field = 4 -        pkg = Package() -        pkg["location"] = line.rsplit()[location_field] -        fileattrs = pkg["location"].split("/")[-1].split("-") -        pkg["arch"] = fileattrs.pop(-1).split(".")[0] -        pkg["release"] = fileattrs.pop(-1) -        pkg["version"] = fileattrs.pop(-1) -        pkg["name"] = "-".join(fileattrs) -        return pkg -                 -    options = { "d": directory, +        return pkginfo_from_filename(line.rsplit()[location_field]) + +    def do_nothing(): +        pass + +    options = { "d": do_nothing,                  "l": package_or_link, -                "-": package_or_link} +                "-": package_or_link, +                " ": do_nothing} + +    package_list=list() +     +    lines=[x for x in rsync_output.split("\n") if ".pkg.tar" in x] + +    for line in lines: +        pkginfo=options[line[0]](line) +        if pkginfo: +            package_list.append(pkginfo) + +    return tuple(package_list) + +def pkginfo_from_files_in_dir(directory): +    """ Returns pkginfo from filenames of packages in dir +    wich has .pkg.tar. on them  +     +    Parameters: +    ---------- +    directory -> str          Directory must exist -    for line in rsync_output.split("\n"): -        if ".pkg.tar" in line: -            pkginfo=options[line[0]](line) -            if pkginfo: -                a.append(pkginfo) +    Returns: +    ---------- +    package_list -> tuple     Contains Package objects """ +    package_list=list() -    return tuple(a) +    if not os.path.isdir(directory): +        raise NonValidDir + +    for filename in glob(os.path.join(directory,"*")): +        if ".pkg.tar." in filename: +            package_list.append(pkginfo_from_filename(filename)) +    return tuple(package_list) + +def pkginfo_from_db(path_to_db): +    """ """  def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,                                           exclude_file=rsync_blacklist, debug=verbose): @@ -111,8 +141,7 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,              a.append(package["location"])      if debug: -        printf(a) -     +        return a      try:          fsock = open(exclude_file,"w")          try: @@ -121,4 +150,8 @@ def generate_exclude_list_from_blacklist(packages_iterable, blacklisted_names,              fsock.close()      except IOError:          printf("%s wasnt written" % blacklist_file) -         + +if __name__ == "__main__": +    a=run_rsync(rsync_list_command) +    packages=pkginfo_from_rsync_output(a) +    generate_exclude_list_from_blacklist(packages,listado(blacklist)) @@ -25,175 +25,224 @@  """  from repm.config import * - +from repm.filter import *  import tarfile  from glob import glob  from os.path import isdir, isfile  def printf(text,output_=output): -	"""Guarda el texto en la variable log y puede imprimir en pantalla.""" -	log_file = open(logname, 'a') -	log_file.write("\n" + str(text) + "\n") -	log_file.close() -	if output_: print (str(text) + "\n") +    """Guarda el texto en la variable log y puede imprimir en pantalla.""" +    log_file = open(logname, 'a') +    log_file.write("\n" + str(text) + "\n") +    log_file.close() +    if output_: print (str(text) + "\n")  def listado(filename_): -	"""Obtiene una lista de paquetes de un archivo.""" -	archivo = open(filename_,"r") -	lista   = archivo.read().split("\n") -	archivo.close() -	return [pkg.split(":")[0] for pkg in lista if pkg] +    """Obtiene una lista de paquetes de un archivo.""" +    archivo = open(filename_,"r") +    lista   = archivo.read().split("\n") +    archivo.close() +    return [pkg.split(":")[0].rstrip() for pkg in lista if pkg]  def db(repo_,arch_): -	"""Construye un nombre para sincronizar una base de datos.""" -	return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_) +    """Construye un nombre para sincronizar una base de datos.""" +    return "/%s/os/%s/%s.db.tar.gz" % (repo_, arch_, repo_)  def packages(repo_, arch_, expr="*"): -	""" Get packages on a repo, arch folder """ -	return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) ) - -def sync_all_repo(verbose_=verbose): -	folders = ",".join(repo_list + dir_list) -	cmd_ = "rsync -av --delete-after --delay-updates " + mirror + mirrorpath + "/{" + folders + "} " + repodir -	printf(cmd_) -	a=commands.getoutput(cmd_) -	if verbose_: printf(a) +    """ Get packages on a repo, arch folder """ +    return tuple( glob( repodir + "/" + repo_ + "/os/" + arch_ + "/" + expr ) ) + +def sync_all_repo(debug=verbose): +    cmd=generate_rsync_command(rsync_list_command) +    rsout=run_rsync(cmd) +    pkgs=pkginfo_from_rsync_output(rsout) +    generate_exclude_list_from_blacklist(pkgs,listado(blacklist),debug=False) +    cmd=generate_rsync_command(rsync_update_command,blacklist_file=rsync_blacklist) +    a=run_rsync(cmd) +    cmd=generate_rsync_command(rsync_post_command,blacklist_file=rsync_blacklist) +    b=run_rsync(cmd) +    if debug: +        printf(a) +        printf(b)  def get_from_desc(desc, var,db_tar_file=False): -	""" Get a var from desc file """ -	desc = desc.split("\n") -	return desc[desc.index(var)+1] +    """ Get a var from desc file """ +    desc = desc.split("\n") +    return desc[desc.index(var)+1]  def get_info(repo_,arch_,db_tar_file=False,verbose_=verbose): -	""" Makes a list of package name, file and license """ -	info=list() -	# Extract DB tar.gz     -	commands.getoutput("mkdir -p " + archdb) -	if not db_tar_file: -		db_tar_file = repodir + db(repo_,arch_) -	if isfile(db_tar_file): -		try: -			db_open_tar = tarfile.open(db_tar_file, 'r:gz') -		except tarfile.ReadError: -			printf("No valid db_file %s" % db_tar_file) -			return(tuple()) -	else: -		printf("No db_file %s" % db_tar_file) -		return(tuple()) -	for file in db_open_tar.getmembers(): -		db_open_tar.extract(file, archdb) -	db_open_tar.close() -	# Get info from file -	for dir_ in glob(archdb + "/*"): -		if isdir(dir_) and isfile(dir_ + "/desc"): -			pkg_desc_file = open(dir_ + "/desc", "r") -			desc = pkg_desc_file.read() -			pkg_desc_file.close() -			info.append((  get_from_desc(desc,"%NAME%"), -				       dir_.split("/")[-1], -				       get_from_desc(desc,"%LICENSE%")  )) -	if verbose_: printf(info) -	commands.getoutput("rm -r %s/*"  % archdb) -	return tuple(info) +    """ Makes a list of package name, file and license """ +    info=list() +    # Extract DB tar.gz     +    commands.getoutput("mkdir -p " + archdb) +    if not db_tar_file: +        db_tar_file = repodir + db(repo_,arch_) +    if isfile(db_tar_file): +        try: +            db_open_tar = tarfile.open(db_tar_file, 'r:gz') +        except tarfile.ReadError: +            printf("No valid db_file %s" % db_tar_file) +            return(tuple()) +    else: +        printf("No db_file %s" % db_tar_file) +        return(tuple()) +    for file in db_open_tar.getmembers(): +        db_open_tar.extract(file, archdb) +    db_open_tar.close() +    # Get info from file +    for dir_ in glob(archdb + "/*"): +        if isdir(dir_) and isfile(dir_ + "/desc"): +            pkg_desc_file = open(dir_ + "/desc", "r") +            desc = pkg_desc_file.read() +            pkg_desc_file.close() +            info.append((  get_from_desc(desc,"%NAME%"), +                       dir_.split("/")[-1], +                       get_from_desc(desc,"%LICENSE%")  )) +    if verbose_: printf(info) +    commands.getoutput("rm -r %s/*"  % archdb) +    return tuple(info)  def make_pending(repo_,arch_,info_): -	""" Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending""" -	search = tuple( listado(blacklist) + listado (whitelist) ) -	if verbose: printf("blaclist + whitelist= " + str(search) ) -	lista_=list() -	for (name,pkg_,license_) in info_: -		if "custom" in license_: -			if name not in search: -				lista_.append( (name, license_ ) ) -		elif not name: -			printf( pkg_ + " package has no %NAME% attibute " ) -	if verbose: printf( lista_ ) -	a=open( pending + "-" + repo_ + ".txt", "w" ).write( -		"\n".join([name + ":" + license_ for (name,license_) in lista_]) ) +    """ Si los paquetes no están en blacklist ni whitelist y la licencia contiene "custom" los agrega a pending""" +    search = tuple( listado(blacklist) + listado (whitelist) ) +    if verbose: printf("blaclist + whitelist= " + str(search) ) +    lista_=list() +    for (name,pkg_,license_) in info_: +        if "custom" in license_: +            if name not in search: +                lista_.append( (name, license_ ) ) +        elif not name: +            printf( pkg_ + " package has no %NAME% attibute " ) +    if verbose: printf( lista_ ) +    a=open( pending + "-" + repo_ + ".txt", "w" ).write( +        "\n".join([name + ":" + license_ for (name,license_) in lista_]) + "\n")  def remove_from_blacklist(repo_,arch_,info_,blacklist_): -	""" Check the blacklist and remove packages on the db""" -	lista_=list() -	pack_=list() -	for (name_, pkg_, license_) in info_: -		if name_ in blacklist_: -			lista_.append(name_) -			for p in packages(repo_,arch_,pkg_ + "*"): -				pack_.append(p) -	if lista_: -		lista_=" ".join(lista_) -		com_ =  "repo-remove " + repodir + db(repo_,arch_) + " " + lista_  -		printf(com_) -		a = commands.getoutput(com_)  -		if verbose: printf(a) -	if pack_: -		pack_=" ".join(pack_) -		com_="chmod a-r " + pack_ -		printf(com_) -		a=commands.getoutput(com_) -		if verbose: printf(a) +    """ Check the blacklist and remove packages on the db""" +    lista_=list() +    pack_=list() +    for (name_, pkg_, license_) in info_: +        if name_ in blacklist_: +            lista_.append(name_) +            for p in packages(repo_,arch_,pkg_ + "*"): +                pack_.append(p) +    if lista_: +        lista_=" ".join(lista_) +        com_ =  "repo-remove " + repodir + db(repo_,arch_) + " " + lista_  +        printf(com_) +        a = commands.getoutput(com_)  +        if verbose: printf(a) + +def cleanup_nonfree_in_dir(directory,blacklisted_names): +    pkgs=pkginfo_from_files_in_dir(directory) +    for package in pkgs: +        if package["name"] in blacklisted_names: +            os.remove(package["location"])  def link(repo_,arch_,file_): -	""" Makes a link in the repo for the package """ -	cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_ -	a=commands.getoutput(cmd_) -	if verbose: -		printf(cmd_ + a) +    """ Makes a link in the repo for the package """ +    cmd_="ln -f " + file_ + " " + repodir + "/" + repo_ + "/os/" + arch_ +    a=commands.getoutput(cmd_) +    if verbose: +        printf(cmd_ + a)  def add_free_repo(verbose_=verbose): -	for repo_ in repo_list: -		for arch_ in arch_list: -			lista_=list() -			for file_ in glob(free_path + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"): -				lista_.append(file_) -				link(repo_,arch_,file_) -			for dir_ in other: -				for file_ in glob(free_path + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"): -					lista_.append(file_) -					link(repo_,arch_,file_) -			if lista_: -				lista_=" ".join(lista_) -				if verbose: printf(lista_) -				cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_  -				printf(cmd_) -				a=commands.getoutput(cmd_) -				if verbose: printf(a) +    cmd_=os.path.join(home,"/usr/bin/sync-free") +    printf(cmd_) +    a=commands.getoutput(cmd_) +    if verbose_: printf(a) +    for repo_ in repo_list: +        for arch_ in arch_list: +            lista_=list() +            for file_ in glob(freedir + repo_ + "/os/" + arch_ + "/*.pkg.tar.*"): +                lista_.append(file_) +            for dir_ in other: +                for file_ in glob(freedir + repo_ + "/os/" + dir_ + "/*.pkg.tar.*"): +                    lista_.append(file_) + +            printf(lista_) + +            if lista_: +                lista_=" ".join(lista_) +                if verbose: printf(lista_) +                cmd_="repo-add " + repodir + db(repo_,arch_) + " " + lista_  +                printf(cmd_) +                a=commands.getoutput(cmd_) +                if verbose: printf(a)  def get_licenses(verbose_=verbose): -	""" Extract the license from packages in repo_,arch_ and in pending_ file""" -	cmd_=home + "/usr/bin/get_license.sh" -	printf(cmd_) -	a=commands.getoutput(cmd_) -	if verbose_: printf(a) +    """ Extract the license from packages in repo_,arch_ and in pending_ file""" +    cmd_=home + "/usr/bin/get_license.sh" +    printf(cmd_) +    a=commands.getoutput(cmd_) +    if verbose_: printf(a) + +def generate_rsync_command(base_command, dir_list=(repo_list + dir_list), destdir=repodir, +                           source=mirror+mirrorpath, blacklist_file=False): +    """ Generates an rsync command for executing it by combining all parameters. +     +    Parameters: +    ---------- +    base_command   -> str +    dir_list       -> list or tuple +    destdir        -> str                  Path to dir, dir must exist. +    source         -> str                  The source for rsync +    blacklist_file -> False or str         Path to file, file must exist. +     +    Return: +    ---------- +    rsync_command -> str """ +    from os.path import isfile, isdir + +    if blacklist_file and not isfile(blacklist_file): +        print(blacklist_file + " is not a file") +        raise NonValidFile + +    if not os.path.isdir(destdir): +        print(destdir + " is not a directory") +        raise NonValidDir + +    dir_list="{" + ",".join(dir_list) + "}" + +    if blacklist_file: +        return " ".join((base_command, "--exclude-from="+blacklist_file, +                        os.path.join(source, dir_list), destdir)) +    return " ".join((base_command, os.path.join(source, dir_list), destdir)) + +def run_rsync(command,debug=verbose): +    """ Runs rsync and gets returns it's output """ +    if debug: +        printf("rsync_command: " + command) +    return commands.getoutput(command)  if __name__ == "__main__": -	from time import time -	start_time = time() -	def minute(): -		return str(round((time() - start_time)/60, 1)) -	 -	printf(" Cleaning %s folder " % (tmp) ) -	commands.getoutput("rm -r %s/*" % tmp) -	printf(" Syncing repo") -	sync_all_repo(True) - -	printf(" Updating databases and pending files lists: minute %s \n" % minute() ) -	for repo in repo_list: -		for arch in arch_list: -			printf( "\n" + repo + "-" + arch + "\n" ) -			printf( "Get info: minute %s "  % minute()  ) -			info=get_info(repo,arch) -			printf( "Make pending: minute %s"  % minute()  ) -			make_pending(repo,arch,info) -			printf( "Update DB: minute %s"  % minute()  ) -			remove_from_blacklist( -				repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) ) - -	printf("Adding Parabola Packages: minute %s\n" % minute() ) -	add_free_repo(True) -	 -	printf("Extracting licenses in pending: minute %s" % minute() ) -	get_licenses() -	 -	printf("\n\nDelay: %s minutes \n" % minute()) -	 +    from time import time +    start_time = time() +    def minute(): +        return str(round((time() - start_time)/60, 1)) +     +    printf(" Cleaning %s folder " % (tmp) ) +    commands.getoutput("rm -r %s/*" % tmp) +    printf(" Syncing repo") +    sync_all_repo(True) + +    printf(" Updating databases and pending files lists: minute %s \n" % minute() ) +    for repo in repo_list: +        for arch in arch_list: +            printf( "\n" + repo + "-" + arch + "\n" ) +            printf( "Get info: minute %s "  % minute()  ) +            info=get_info(repo,arch) +            printf( "Make pending: minute %s"  % minute()  ) +            make_pending(repo,arch,info) +            printf( "Update DB: minute %s"  % minute()  ) +            remove_from_blacklist( +                repo, arch, info, tuple( listado(blacklist) + listado(pending + "-" + repo + ".txt") ) ) + +    printf("Adding Parabola Packages: minute %s\n" % minute() ) +    add_free_repo(True) +     +    printf("Extracting licenses in pending: minute %s" % minute() ) +    get_licenses() +     +    printf("\n\nDelay: %s minutes \n" % minute()) +     diff --git a/test/blacklist_sample b/test/blacklist_sample new file mode 100644 index 0000000..2a02af6 --- /dev/null +++ b/test/blacklist_sample @@ -0,0 +1,2 @@ +alex:alex-libre: Aquí va un comentario +gmime22 ::Non free dependencies
\ No newline at end of file diff --git a/test/core.db.tar.gz b/test/core.db.tar.gzBinary files differ new file mode 100644 index 0000000..5eb2081 --- /dev/null +++ b/test/core.db.tar.gz diff --git a/test/depends b/test/depends new file mode 100644 index 0000000..7ff3ad4 --- /dev/null +++ b/test/depends @@ -0,0 +1,4 @@ +%DEPENDS% +glibc>=2.13 +zlib + diff --git a/test/desc b/test/desc new file mode 100644 index 0000000..abba644 --- /dev/null +++ b/test/desc @@ -0,0 +1,39 @@ +%FILENAME% +binutils-2.21-4-x86_64.pkg.tar.xz + +%NAME% +binutils + +%VERSION% +2.21-4 + +%DESC% +A set of programs to assemble and manipulate binary and object files + +%GROUPS% +base + +%CSIZE% +3412892 + +%ISIZE% +17571840 + +%MD5SUM% +4e666f87c78998f4839f33dc06d2043a + +%URL% +http://www.gnu.org/software/binutils/ + +%LICENSE% +GPL + +%ARCH% +x86_64 + +%BUILDDATE% +1297240369 + +%PACKAGER% +Allan McRae <allan@archlinux.org> + diff --git a/test/rsync_output_sample b/test/rsync_output_sample new file mode 100644 index 0000000..72d9cd0 --- /dev/null +++ b/test/rsync_output_sample @@ -0,0 +1,14 @@ +dr-xr-sr-x        4096 2010/09/11 11:37:10 . +-rw-r--r--          11 2011/02/08 00:00:01 lastsync +drwxrwxr-x          15 2010/09/11 11:28:50 community-staging +drwxrwxr-x          30 2010/09/11 11:28:50 community-staging/os +drwxrwxr-x        8192 2011/02/07 17:00:01 community-staging/os/i686 +lrwxrwxrwx          52 2010/12/23 16:51:01 community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz -> ../../../pool/community/alex-2.3.4-1-i686.pkg.tar.xz +lrwxrwxrwx          27 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db -> community-staging.db.tar.gz +-rw-rw-r--        2237 2011/02/07 14:02:54 community-staging/os/i686/community-staging.db.tar.gz +-rw-rw-r--        3209 2011/02/07 14:00:13 community-staging/os/i686/community-staging.db.tar.gz.old +drwxrwxr-x          15 2009/07/22 15:07:56 community +drwxrwxr-x          40 2009/08/04 15:57:42 community/os +drwxrwsr-x       36864 2011/02/03 05:00:01 community/os/any +-rw-rw-r--      303336 2010/07/16 10:06:28 community/os/any/any2dvd-0.34-4-any.pkg.tar.xz +-rw-rw-r--      221664 2010/03/28 15:55:48 community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz diff --git a/test/test1.py b/test/test1.py deleted file mode 100644 index 13b5660..0000000 --- a/test/test1.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- encoding: utf-8 -*- -""" """ - -__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>" -__version__ = "$Revision: 1.1 $" -__date__ = "$Date: 2011/02/08 $" -__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández" -__license__ = "GPL3+" - -from repm.config import * -from repm.filter import * -import unittest - -class KnownValues(unittest.TestCase): -    directory_list=("drwxrwxr-x          15 2010/09/11 11:28:50 community-staging", -                    "drwxrwxr-x          30 2010/09/11 11:28:50 community-staging/os", -                    'dr-xr-sr-x        4096 2010/09/11 11:37:10 .') -    # (rsync_out, name, version, arch, release, location) -    examples=( -        ("lrwxrwxrwx          53 2011/01/31 01:52:06 community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz -> ../../../pool/community/apvlv-0.1.0-2-i686.pkg.tar.xz", "apvlv","0.1.0","i686", "2", "community-testing/os/i686/apvlv-0.1.0-2-i686.pkg.tar.xz"), -        ("lrwxrwxrwx          56 2011/02/04 14:34:08 community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz -> ../../../pool/community/calibre-0.7.44-2-i686.pkg.tar.xz","calibre","0.7.44","i686", "2", "community-testing/os/i686/calibre-0.7.44-2-i686.pkg.tar.xz"), -        ("-rw-rw-r--     5846249 2010/11/13 10:54:25 pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz", -         "abuse","0.7.1","x86_64","1","pool/community/abuse-0.7.1-1-x86_64.pkg.tar.gz"), -        ("-rw-rw-r--      982768 2011/02/05 14:38:17 pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz", -         "acetoneiso2","2.3","i686", "2", "pool/community/acetoneiso2-2.3-2-i686.pkg.tar.xz"), -        ("-rw-rw-r--      982764 2011/02/05 14:38:40 pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz", -         "acetoneiso2","2.3","x86_64","2","pool/community/acetoneiso2-2.3-2-x86_64.pkg.tar.xz") -        ) - -    def generate_results(self, example_tuple, attr): -        rsync_out, name, version, arch, release, location = example_tuple -        return get_file_list_from_rsync_output(rsync_out)[0][attr], locals()[attr] -     -    def testDirectoryOutput(self): -        """get_file_list_from_rsync_output should ignore directories""" -        rsync_out="\n".join(self.directory_list) -        result=get_file_list_from_rsync_output(rsync_out) -        self.assertEqual(tuple(), result) - -    def testNames(self): -        for i in self.examples: -            k,v = self.generate_results(example_tuple=i,attr="name") -            self.assertEqual(k, v) - -    def testVersions(self): -        for i in self.examples: -            k,v = self.generate_results(example_tuple=i,attr="version") -            self.assertEqual(k, v) - -    def testArchs(self): -        for i in self.examples: -            k,v = self.generate_results(example_tuple=i,attr="arch") -            self.assertEqual(k, v) - -    def testReleases(self): -        for i in self.examples: -            k,v = self.generate_results(example_tuple=i,attr="release") -            self.assertEqual(k, v) - -    def testLocations(self): -        for i in self.examples: -            k,v = self.generate_results(example_tuple=i,attr="location") -            self.assertEqual(k, v) -       -if __name__ == "__main__": -    unittest.main() diff --git a/test/test_filter.py b/test/test_filter.py new file mode 100644 index 0000000..1906b87 --- /dev/null +++ b/test/test_filter.py @@ -0,0 +1,192 @@ +# -*- encoding: utf-8 -*- +""" """ + +__author__ = "Joshua Ismael Haase Hernández <hahj87@gmail.com>" +__version__ = "$Revision: 1.1 $" +__date__ = "$Date: 2011/02/08 $" +__copyright__ = "Copyright (c) 2011 Joshua Ismael Haase Hernández" +__license__ = "GPL3+" + +from repm.config import * +from repm.filter import * +import unittest + +class pkginfo_from_file_KnownValues(unittest.TestCase): +    # (filename, name, version, release, arch) +    # filename is location +    known=( +        ("community-testing/os/i686/inputattach-1.24-3-i686.pkg.tar.xz","inputattach","1.24","3","i686"), +        ("community-testing/os/i686/ngspice-22-1-i686.pkg.tar.xz","ngspice","22","1","i686"), +        ("community-testing/os/i686/tmux-1.4-2-i686.pkg.tar.xz","tmux","1.4","2","i686"), +        ("community-testing/os/i686/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"), +        ("../../../pool/community/tor-0.2.1.29-2-i686.pkg.tar.xz","tor","0.2.1.29","2","i686"), +        ("community-testing/os/x86_64/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"), +        ("../../../pool/community/inputattach-1.24-3-x86_64.pkg.tar.xz","inputattach","1.24","3","x86_64"), +        ("tor-0.2.1.29-2-x86_64.pkg.tar.xz","tor","0.2.1.29","2","x86_64"), +        ) + +    def generate_results(self, example_tuple, attr): +        location, name, version, release, arch = example_tuple +        return pkginfo_from_filename(location)[attr], locals()[attr] + +    def testReturnPackageObject(self): +        for i in self.known: +            location, name, version, release, arch = i +            self.assertIsInstance(pkginfo_from_filename(location),Package) + +    def testNames(self): +        for i in self.known: +            k,v = self.generate_results(example_tuple=i,attr="name") +            self.assertEqual(k, v) + +    def testVersions(self): +        for i in self.known: +            k,v = self.generate_results(example_tuple=i,attr="version") +            self.assertEqual(k, v) + +    def testArchs(self): +        for i in self.known: +            k,v = self.generate_results(example_tuple=i,attr="arch") +            self.assertEqual(k, v) + +    def testReleases(self): +        for i in self.known: +            k,v = self.generate_results(example_tuple=i,attr="release") +            self.assertEqual(k, v) + +    def testLocations(self): +        for i in self.known: +            k,v = self.generate_results(example_tuple=i,attr="location") +            self.assertEqual(k, v) + +class pkginfo_from_file_BadInput(unittest.TestCase): +    bad=("community-testing/os/i686/community-testing.db", +         "community-testing/os/i686/community-testing.db.tar.gz", +         "community-testing/os/i686/community-testing.db.tar.gz.old", +         "community-testing/os/i686/community-testing.files", +         "community-testing/os/i686/community-testing.files.tar.gz", +         "community-testing/os/x86_64") + +    def testBadInput(self): +        for i in self.bad: +            self.assertRaises(NonValidFile,pkginfo_from_filename,i) + +class pkginfoFromRsyncOutput(unittest.TestCase): +    example_package_list=(Package(),Package(),Package()) +    example_package_list[0].package_info={ "name"    : "alex", +                                           "version" : "2.3.4", +                                           "release" : "1", +                                           "arch"    : "i686", +                                           "license" : False, +                                           "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz", +                                           "depends" : False,} +    example_package_list[1].package_info={ "name"    : "any2dvd", +                                           "version" : "0.34", +                                           "release" : "4", +                                           "arch"    : "any", +                                           "license" : False, +                                           "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz", +                                           "depends" : False,} +    example_package_list[2].package_info={ "name"    : "gmime22", +                                           "version" : "2.2.26", +                                           "release" : "1", +                                           "arch"    : "x86_64", +                                           "license" : False, +                                           "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz", +                                           "depends" : False,} + +    try: +        output_file = open("rsync_output_sample") +        rsync_out= output_file.read() +        output_file.close() +    except IOError: print("There is no rsync_output_sample file") + +    pkglist = pkginfo_from_rsync_output(rsync_out) + +    def testOutputArePackages(self): +        if not self.pkglist: +            self.fail("not pkglist:" + str(self.pkglist)) +        for pkg in self.pkglist: +            self.assertIsInstance(pkg,Package) + +    def testPackageInfo(self):  +        if not self.pkglist: +            self.fail("Pkglist doesn't exist: " + str(self.pkglist)) +        self.assertEqual(self.pkglist,self.example_package_list) + +class generateRsyncBlacklist(unittest.TestCase): +    example_package_list=(Package(),Package(),Package()) +    example_package_list[0].package_info={ "name"    : "alex", +                                           "version" : "2.3.4", +                                           "release" : "1", +                                           "arch"    : "i686", +                                           "license" : False, +                                           "location": "community-staging/os/i686/alex-2.3.4-1-i686.pkg.tar.xz", +                                           "depends" : False,} +    example_package_list[1].package_info={ "name"    : "any2dvd", +                                           "version" : "0.34", +                                           "release" : "4", +                                           "arch"    : "any", +                                           "license" : False, +                                           "location": "community/os/any/any2dvd-0.34-4-any.pkg.tar.xz", +                                           "depends" : False,} +    example_package_list[2].package_info={ "name"    : "gmime22", +                                           "version" : "2.2.26", +                                           "release" : "1", +                                           "arch"    : "x86_64", +                                           "license" : False, +                                           "location": "community/os/x86_64/gmime22-2.2.26-1-x86_64.pkg.tar.xz", +                                           "depends" : False,} + +    def testListado(self): +        self.assertEqual(listado("blacklist_sample"),["alex","gmime22"]) + +    def testExcludeFiles(self): +        a=generate_exclude_list_from_blacklist(self.example_package_list,listado("blacklist_sample"),debug=True) +        b=[self.example_package_list[0]["location"],self.example_package_list[2]["location"]] +        self.assertEqual(a,b) + +class pkginfo_from_descKnownValues(unittest.TestCase): +    pkgsample=Package() +    pkgsample.package_info={"name"    : "binutils", +                            "version" : "2.21", +                            "release" : "4", +                            "arch"    : "x86_64", +                            "license" : "GPL", +                            "location": "binutils-2.21-4-x86_64.pkg.tar.xz", +                            "depends" : False,} +    pkggen=pkginfo_from_desc("desc") +    def testPkginfoFromDesc(self): +        if self.pkggen is None: +            self.fail("return value is None") +        self.assertEqual(self.pkgsample,self.pkggen) +         +class pkginfo_from_db(unittest.TestCase): +    archdb = os.path.join("./workdir") +    example_package_list=(Package(),Package(),Package()) +    example_package_list[0].package_info={ "name"    : "acl", +                                           "version" : "2.2.49", +                                           "release" : "2", +                                           "arch"    : "x86_64", +                                           "license" : ("LGPL",), +                                           "location": "acl-2.2.49-2-x86_64.pkg.tar.xz" +                                           "depends" : ("attr>=2.4.41"),} +    example_package_list[1].package_info={ "name"    : "glibc", +                                           "version" : "2.13", +                                           "release" : "4", +                                           "arch"    : "x86_64", +                                           "license" : ("GPL","LGPL"), +                                           "location": "glibc-2.13-4-x86_64.pkg.tar.xz" +                                           "depends" : ("linux-api-headers>=2.6.37","tzdata",),} +    example_package_list[2].package_info={ "name"    : "", +                                           "version" : "2.2.26", +                                           "release" : "1", +                                           "arch"    : "x86_64", +                                           "license" : False, +                                           "location": "" +                                           "depends" : False,}     +     +     +if __name__ == "__main__": +    unittest.main() + | 
