Документ взят из кэша поисковой машины. Адрес оригинального документа : http://www.stsci.edu/spst/UnixTransition/doc/archive_util.py
Дата изменения: Fri Feb 28 14:46:08 2014
Дата индексирования: Sat Mar 1 16:16:40 2014
Кодировка:

Поисковые слова: http astrokuban.info astrokuban
#
#MODULE archive_util
#
#***********************************************************************
"""

**PURPOSE** --
A module for PASS archival related utilities.

**DEVELOPER** --
Greg Wenzel

**MODIFICATION HISTORY** --

o initial implementation GWW 11/20/03
o add add_files function drc 3/5/12
o avoid collisions with other processes by moving the archive file
from a temporary location. drc 4/4/13
"""
#***********************************************************************
import configure_util
import spss_sys_util
import zipfile
import os
import fnmatch
import cPickle
import exceptions
import glob

__version__ = "4/4/13"

PASS_ARCHIVE_DIR = spss_sys_util.resolver("PASS_ARCHIVE_DIR")
WORKING_DIR = os.path.join(PASS_ARCHIVE_DIR, 'temp')
if not os.path.isdir(WORKING_DIR):
os.mkdir(WORKING_DIR)

class ArchiveError(exceptions.Exception):
pass

class pass_archive(zipfile.ZipFile):
"""a class for managing pass archive files
"""
def get_list_of_pickle_paths(self,pickle_filter='*'):
"""returns a list of zipped pickle paths
"""
pickle_filter = "%s.pickle" %os.path.splitext(pickle_filter)[0]
return self.glob(pickle_filter)

def get_pickled_object(self,pickle_path):
"""returns a object from a zipped pickle
"""
if self.mode not in ['r','a']:
raise RuntimeError('get_pickled_object() requires mode "r" or "a"')
return cPickle.loads(self.read(pickle_path))

def get_pickle_info(self,pickle_path):
"""returns the pickle info tuple from a zipped pickle
"""
if self.mode not in ['r','a']:
raise RuntimeError('get_pickle_info() requires mode "r" or "a"')
pickle_object = self.get_pickled_object(pickle_path)
pickle_name = os.path.splitext(os.path.basename(pickle_path))[0]
if isinstance(pickle_object,configure_util.configured_file)\
or isinstance(pickle_object,configure_util.pickled_product):
tup = (pickle_name, pickle_object.get_create_time(),
pickle_object.get_comment())
else:
print 'WARNING: %s is not a valid configured file or pickled product'\
%pickle_name
return tup

def get_pickle_info_list(self,pickle_filter='*'):
"""returns the pickle info list from zipped pickles matching pickle_filter
"""
if self.mode not in ['r','a']:
raise RuntimeError('get_pickle_info_list() requires mode "r" or "a"')
archive_pickle_list = self.get_list_of_pickle_paths(pickle_filter)
info_list = []
for pickle_path in archive_pickle_list:
info_list.append(self.get_pickle_info(pickle_path))
return info_list

def add_pickle(self,pickle_path):
"""adds a pickle and associated configured files to the archive
"""
if self.mode == 'r':
raise RuntimeError('add_pickle() requires mode "w" or "a"')

pickle_path = os.path.normpath(pickle_path)
pickled_object = configure_util.get_pickled_object(pickle_path)

archive_list = []
glob_list = pickled_object.get_pass_archive_glob_list()
for glob_string in glob_list:
archive_list.extend(spss_sys_util.glob(glob_string))
if not archive_list:
error_text = 'No files selected for archival!\n for pickle:%s' \
%pickle_path
raise ArchiveError(error_text)

for file_path in archive_list:
file_path = os.path.normpath(file_path)
if file_path not in self.namelist():
self.write(file_path)
if pickle_path not in self.namelist():
self.write(pickle_path)

def add_files(self, file_list):
"""Adds files to the archive.
"""
for File in file_list:
fpath = os.path.normpath(File)
self.write(fpath)

def glob(self,glob_string):
"""performs a glob like function for files in the zip archive
"""
file_list = []
for file_path in self.namelist():
if fnmatch.fnmatch(file_path,os.path.normpath(glob_string)):
file_list.append(file_path)
return file_list

def extract_archived_pickle(self,pickle_path,pickle_out_dir=None):
"""extracts an archived pickle and all associated configured data
"""
if self.mode not in ['r','a']:
raise RuntimeError('extract_archived_pickle() requires mode "r" or "a"')
if pickle_out_dir:
pickle_out_path = os.path.join(pickle_out_dir,
os.path.basename(pickle_path))
else:
pickle_out_path = pickle_path

if not os.path.exists(pickle_out_path):
directory_path = os.path.dirname(pickle_path)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
open(pickle_path,'wb').write(self.read(pickle_out_path))
extract_object = self.get_pickled_object(pickle_path)

archive_list = []
glob_list = extract_object.get_pass_archive_glob_list()
for glob_string in glob_list:
archive_list.extend(self.glob(glob_string))
if not archive_list:
error_text = 'No files selected for extraction!\n for pickle:%s' \
%pickle_path
raise ArchiveError(error_text)

for file_path in archive_list:
if not os.path.exists(file_path):
directory_path = os.path.dirname(file_path)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
print 'restoring:',file_path
open(file_path,'wb').write(self.read(file_path))

def archive_pickle(pickle_path,archive_dir=PASS_ARCHIVE_DIR):
"""archives pickle to archive directory
"""
pickle_path = os.path.normpath(pickle_path)

zname = "%s.zip" % os.path.splitext(os.path.basename(pickle_path))[0]

temp_file_path = os.path.normpath(os.path.join(WORKING_DIR, zname))
zip_file_path = os.path.normpath(os.path.join(archive_dir, zname))

archive_file = pass_archive(temp_file_path,'w',zipfile.ZIP_DEFLATED)
archive_file.add_pickle(pickle_path)
archive_file.close()
os.rename(temp_file_path, zip_file_path)
return zip_file_path

def archive_files(glob_string, archive_dir=os.path.join(PASS_ARCHIVE_DIR,'add')):
"""archives files to archive directory
"""
glist = glob.glob(glob_string)

if glist:
zname = "%s.zip" % os.path.splitext(os.path.basename(glist[0]))[0]
temp_file_path = os.path.normpath(os.path.join(WORKING_DIR, zname))
zip_file_path = os.path.normpath(os.path.join(archive_dir, zname))

archive_file = pass_archive(temp_file_path,'w',zipfile.ZIP_DEFLATED)
archive_file.add_files(glist)
archive_file.close()
os.rename(temp_file_path, zip_file_path)
return zip_file_path