Документ взят из кэша поисковой машины. Адрес оригинального документа : http://www.stsci.edu/spst/UnixTransition/doc/dan_util.py
Дата изменения: Fri Apr 8 12:46:11 2016
Дата индексирования: Sun Apr 10 23:20:00 2016
Кодировка:

Поисковые слова: http www.astronomy.ru forum index.php topic 4644.0.html
#
# MODULE dan_util
#
#***********************************************************************
"""

**PURPOSE** --
A module for dealing with JWST DAN messages.

**DEVELOPER** --
Don Chance

**MODIFICATION HISTORY** --

o Initial implementaion 11/26/13
"""
#***********************************************************************
try:
# Default to using lxml if it's available.
from lxml import etree as ET
except:
import xml.etree.ElementTree as ET

__version__ = '16.01.13'

import os
import copy
import spss_sys_util
import shutil
import logging
import urllib
import jwst_time_util
import binascii
from spss_logging_util import Logger
from jwst_prd_util import PPS_xmlschema as xmlschema
from jwst_prd_util import get_latest_prd

DAN_TEMPLATE = """

OPS



PPS



FOS



true


"""

DAN_TIME_FORMAT = "%Y-%j/%H:%M:%S"

# DAN_PRODUCT_PATHS keys are Product IDs
DAN_PRODUCT_PATHS = {'OSF': 'fosToPps/osf',
'PredictEph': 'fosToPps/predictEph',
'DSNContact': 'fosToPps/dsnContact',
'skManeuver': 'fosToPps/skManeuver'}

DAN_SERVER = 'yoda.stsci.edu'
#DAN_SERVER_ACCOUNT = 'svc_ppssftp'
DAN_SERVER_ACCOUNT = 'chance'

DAN_RECEIVE_DIR = '/data/jwst/scheduling/dan/received'

DAN_SEND_DIR = '/data/jwst/scheduling/dan/sent'

DAN_ENV_DICT = {'operational': 'OPS',
'test': 'TEST',
'develop': 'DEV'}
DAN_ENV = DAN_ENV_DICT.values()


def CRC32_from_file(filename):
"""Compute the cyclic redundancy check for the input file.
"""
return "%08X" % (binascii.crc32(open(filename, 'rb').read()) & 0xFFFFFFFF)


def parse_file_spec(element):
"""Parse the FILE_SPEC subelement from a DAN.

Returns a tuple of URL, TYPE, SIZE, CHECKSUM.
"""
url = None
Type = None
size = None
checksum = None
if element.find('URL'):
url = element.find('URL').text
if element.find('TYPE'):
Type = element.find('TYPE').text
if element.find('SIZE'):
size = int(element.find('SIZE').text)
if element.find('CHECKSUM'):
checksum = element.find('CHECKSUM').text
return url, Type, size, checksum


def dan_product_factory(localpath,
url,
product_type='',
size=0,
checksum=0,
logfile=None,
logging_level=logging.INFO):
if product_type == 'OSF':
return OSF_product(localpath, url, product_type, size, checksum, logfile, logging_level)
elif product_type == 'PredictEph':
return PredictEph_product(localpath, url, product_type, size, checksum, logfile, logging_level)
elif product_type == 'DSNContact':
return DSNContact_product(localpath, url, product_type, size, checksum, logfile, logging_level)
elif product_type == 'skManeuver':
return skManeuver_product(localpath, url, product_type, size, checksum, logfile, logging_level)
return None


class dan_base(Logger):
def __init__(self, logfile=None, logging_level=logging.INFO, email=None):
super(dan_base, self).__init__(logfile, logging_level, email)

def validate(self, prd_name=None, override_name='NONE', dbserver='', dbname='', sql_schema=''):
"""Validate this DAN against the DAN schema from the PPSDB.
"""
if not dbserver:
dbserver = os.environ.get('DSQUERY')
if not dbname:
dbname = os.environ.get('PPS_DB_NAME')
if not prd_name:
prd_name = get_latest_prd(dbserver, dbname, sql_schema)
schema = xmlschema('DAN', prd_name, override_name, dbserver, dbname, sql_schema, log=self.log)
return schema.validate(self.dan_etree)


class dan_product_file(dan_base):
def __init__(self, path, url, product_type, size, checksum, logfile, logging_level, email):
super(dan_product_file, self).__init__(logfile, logging_level, email)
self.path = path
self.url = urllib
self.product_type = product_type
self.size = size
self.checksum = checksum
self.error = False

def check_checksum(self):
"""Check the calculated CRC32 checksum against that provided by the DAN.

Returns True if the check passes. Otherwise, returns False.
"""
if not self.checksum:
self.log.info('No DAN checksum. Checksum check skipped...')
return True
cs = CRC32_from_file(self.path)
self.log.info('Calculated checksum: %s' % cs)
self.log.info('DAN checksum value: %s' % self.checksum)
if int(cs, 16) == int(self.checksum, 16):
self.log.info('Checksum check passes!')
return True
else:
self.log.error('Checksum check fails!')
self.error = True
return False

def check_size(self):
"""Check the file size against that provided by the DAN.

Returns True if the check passes. Otherwise, returns False.
"""
if not self.size:
self.log.info('No DAN size. Size check skipped...')
return True
fs = os.path.getsize(self.path)
self.log.info("File size: %i" % fs)
self.log.info("DAN size value: %i" % self.size)
if fs == self.size:
self.log.info('Size check passes!')
return True
else:
self.log.error('Size check fails!')
self.error = True
return False

def archive(self, dan_env='ops'):
if dan_env == 'ops':
if self.error:
shutil.copy(self.path, os.path.join(DAN_RECEIVE_DIR, 'received_files/error'))
else:
shutil.copy(self.path, os.path.join(DAN_RECEIVE_DIR, 'received_files/processed/', self.product_type))
elif dan_env in ('test', 'dev'):
shutil.copy(self.path, os.path.join(DAN_RECEIVE_DIR, 'received_files/test'))
else:
shutil.copy(self.path, os.path.join(DAN_RECEIVE_DIR, 'received_files/unprocessed'))
os.remove(self.path)

def process(self):
pass


class OSF_product(dan_product_file):
def process(self, baseline=''):
if baseline:
status, output = spss_sys_util.command('observatory_status -load %s -baseline=%s' % (self.path, baseline))
else:
status, output = spss_sys_util.command('observatory_status -load %s' % self.path)
if status:
self.log.error(output)
else:
self.log.info(output)


class PredictEph_product(dan_product_file):
def process(self, replace=False):
if replace:
status, output = spss_sys_util.command('ephem -create_jwst -replace %s' % self.path)
else:
status, output = spss_sys_util.command('ephem -create_jwst -noreplace %s' % self.path)
if status:
self.log.error(output)
else:
self.log.info(output)


class DSNContact_product(dan_product_file):
def process(self, replace=False):
if replace:
status, output = spss_sys_util.command('dsn -file -replace %s' % self.path)
else:
status, output = spss_sys_util.command('dsn -file -noreplace %s' % self.path)
if status:
self.log.error(output)
else:
self.log.info(output)


class skManeuver_product(dan_product_file):
def process(self):
status, output = spss_sys_util.command('station_keeping -load %s' % self.path)
if status:
self.log.error(output)
else:
self.log.info(output)


class dan_parser(dan_base):
"""Parse an input DAN.
"""
def __init__(self, dan="", sftp=None, logfile=None, logging_level=logging.INFO, email=None):
# Set up logging...
super(dan_parser, self).__init__(logfile, logging_level, email)
# Determine if dan is a file, a file-like object or a string.
if os.path.isfile(dan):
# the dan is a file path
self.dan_filepath = os.path.abspath(dan)
self.dan_etree = ET.parse(dan)
elif hasattr(dan, 'read'):
# the dan is a file-like object
self.dan_filepath = ''
self.dan_etree = ET.parse(dan)
elif self.dan:
# the dan is a string
self.dan_filepath = ''
self.dan_etree = ET.ElementTree(ET.fromstring(dan))
# Initialize the DAN_ACK
self.ack_etree = copy.deepcopy(self.dan_etree)
self.ack_etree.getroot().tag = "DAN_ACK"
self.ack_filepath = '' # filled in when the ACK file is created

self.sftp = sftp

def write_ack(self, filename=''):
"""Write the DAN_ACK XML to a file.
"""
if not filename and self.dan_filepath:
filename = self.dan_filepath.replace('DAN', 'ACK')
self.ack_etree.write(filename)
self.ack_filepath = filename
return filename

def retrieve_files(self):
"""Retrieve the files listed in the DAN.

Returns a list of the retrieve file paths.

Update the DAN_ACK XML accordingly.
"""
retrieved_file_list = []
for File_element in self.ack_etree.findall('FILES/FILE_SPEC'):
url, product_type, size, checksum = parse_file_spec(File_element)
#url = File_element.find('URL').text
scheme, path = url.split(':')
newpath = os.path.join(DAN_RECEIVE_DIR, 'holding', os.path.basename(path))
status = 'RECEIVED'
product = None
if scheme == 'file':
if os.path.isfile(path):
try:
shutil.copy(path, newpath + '.part')
except:
status = 'UNAVAILABLE'
# log error
self.log.error(spss_sys_util.get_traceback_string())
File_element.set('STATUS', status)
if status == 'RECEIVED':
os.rename(newpath + '.part', newpath)
self.log.info('file %s received.' % newpath)
product = dan_product_factory(newpath, url, product_type, size, checksum)
retrieved_file_list.append(product)
if (self.ack_etree.find('FILES/RECIPIENT_OWNERSHIP')
and self.ack_etree.find('FILES/RECIPIENT_OWNERSHIP').text.lower() in ('true', 1)):
os.remove(path)
self.log.info('%s deleted.' % path)
elif scheme == 'https':
try:
urllib.urlretrieve(url, newpath + '.part')
except:
status = 'UNAVAILABLE'
# log error
self.log.error(spss_sys_util.get_traceback_string())
File_element.set('STATUS', status)
if status == 'RECEIVED':
os.rename(newpath + '.part', newpath)
self.log.info('file %s received.' % newpath)
product = dan_product_factory(newpath, url, product_type, size, checksum)
retrieved_file_list.append(product)
elif scheme == 'sftp':
if self.sftp:
try:
self.sftp.get(path, newpath + '.part')
except:
status = 'UNAVAILABLE'
# log error
self.log.error(spss_sys_util.get_traceback_string())
File_element.set('STATUS', status)
if status == 'RECEIVED':
os.rename(newpath + '.part', newpath)
self.log.info('file %s received.' % newpath)
if (self.ack_etree.find('FILES/RECIPIENT_OWNERSHIP')
and self.ack_etree.find('FILES/RECIPIENT_OWNERSHIP').text.lower() in ('true', 1)):
self.sftp.delete(path)
product = dan_product_factory(newpath, url, product_type, size, checksum)
retrieved_file_list.append(product)
else:
self.log.error('no SFTP connection with which to download file.')
status = 'UNAVAILABLE'
File_element.set('STATUS', status)
else:
self.log.error('unknown file transfer scheme: %s' % scheme)
status = 'INVALID_DAN'
File_element.set('STATUS', status)
#raise ValueError('unknown file tranfer scheme: %s' % scheme)
if product:
if not product.check_size():
File_element.set('STATUS', 'BAD_SIZE')
if not product.check_checksum():
File_element.set('STATUS', 'BAD_CHECKSUM')
return retrieved_file_list

def return_dan_ack(self):
"""Send back the DAN_ACK XML file.
"""
if self.ack_etree.find('RETURN_URL'):
return_url = self.ack_etree.find('RETURN_URL').text
scheme, path = return_url.split(':')
self.log.info("RETURN_URL: %s" % return_url)
ack_filename = self.write_ack()
if scheme == 'file':
if os.path.isdir(path):
p = self.write_ack(os.path.join(path, ack_filename))
self.log.info('dan_ack sent (via file copy): %s' % p)
else:
self.log.error('dan_ack failure! File scheme specified, but %s is not a directory!' % path)
elif scheme == 'https':
import requests
requests.post(return_url, files={ack_filename: open(ack_filename, 'rb')})
self.log.info('dan_ack sent (via https): %s' % ack_filename)
elif scheme == 'sftp':
if self.sftp:
self.sftp.chdir(path)
self.sftp.put(ack_filename)
self.log.info('dan_ack sent (via sftp): %s' % ack_filename)
else:
self.sftp.error('No ACK sent -- no SFTP connection exists!')
else:
self.log.error('unknown RETURN_URL scheme: %s' % scheme)
self.log.error('No ACK sent.')
else:
self.log.info('No address for acknowledgement. No DAN_ACK sent.')

def rdel(self, remote_dan_path):
"""Delete the DAN from the server.
"""
if self.sftp:
self.sftp.delete(remote_dan_path)
self.log.info('%s deleted.' % remote_dan_path)
else:
self.info('rdel called, but no SFTP connection exists.')

def archive(self):
"""Move the DAN (and DAN ACK) to the archive directory.
"""
dan_archive_dir = os.path.join(DAN_RECEIVE_DIR, 'dan_archive')
if self.dan_filepath:
dan_archive = os.path.join(dan_archive_dir, os.path.basename(self.dan_filepath))
shutil.copy(self.dan_filepath, dan_archive)
self.log.info('%s copied to %s.' % (self.dan_filepath, dan_archive))
os.remove(self.dan_filepath)
self.log.info('%s deleted.' % self.dan_filepath)
if self.ack_filepath:
ack_archive = os.path.join(dan_archive_dir, os.path.basename(self.ack_filepath))
shutil.copy(self.ack_filepath, ack_archive)
self.log.info('%s copied to %s.' % (self.ack_filepath, ack_archive))
os.remove(self.ack_filepath)
self.log.info('%s deleted.' % self.ack_filepath)


class dan_builder(Logger):
"""A class for the construction of a new DAN file.
"""
def __init__(self,
product_id,
source_id='PPS',
dest_id='FOS',
scheme='sftp',
logfile=None,
logging_level=logging.INFO,
dist=None):
# Set up logging...
super(dan_builder, self).__init__(logfile, logging_level, dist)

self.dan_etree = ET.ElementTree(ET.fromstring(DAN_TEMPLATE))
t = jwst_time_util.jwst_time()
self.product_id = product_id
self.source_id = source_id
self.dest_id = dest_id
self.scheme = self.set_scheme(scheme)
self.set_datetime(t)
self.set_danid()
self.file_count = 0

def set_filename(self, filename=None):
if not filename:
filename = self.get_danid() + "_DAN.xml"
self.filename = filename

def get_filename(self):
if not self.filename:
self.set_filename()
return self.filename

def write(self, filename=None):
self.set_filename(filename)
self.dan_etree.write(self.filename)

def set_datetime(self, time=None):
if time:
self.create_time = jwst_time_util.jwst_time(time)
else:
self.create_time = jwst_time_util.jwst_time()
self.dan_etree.find('DATE_TIME').text = self.create_time.strftime('%Y/%m/%d %H:%M:%S')

def set_danid(self, danid=None):
'''Set the dan id the input number or to system seconds if no value is
passed in.
'''
if not danid:
danid = (self.source_id + '-' + self.dest_id + '_'
+ self.product_id + '_' + self.create_time.strftime('%Y-%jT%H%M%S'))
self.dan_etree.find('DAN_ID').text = danid

def get_danid(self):
if not self.dan_etree.find('DAN_ID').text:
self.set_danid()
return self.dan_etree.find('DAN_ID').text

def set_env(self, dan_env):
if str(dan_env).upper() not in DAN_ENV:
raise ValueError('dan_env set to %s. Legal values are OPS, TEST and DEV.' % str(dan_env).upper())

self.dan_etree.find('DAN_ENV').text = str(dan_env).upper()

def set_component(self, component):
self.dan_etree.find('DAN_ORIGINATOR/COMPONENT').text = str(component)

def set_recipient(self, recipient):
self.dan_etree.find('DAN_RECIPIENT/SUBSYSTEM').text = str(recipient)

def set_return_url(self, return_url):
self.dan_etree.find('RETURN_URL').text = str(return_url)

def set_scheme(self, scheme):
if scheme not in ('file', 'https', 'sftp'):
raise ValueError('unknown transfer scheme: %s' % scheme)
self.scheme = scheme

def get_scheme(self):
return self.scheme

def add_file(self, path, filetype=""):
url = path_to_url(self.scheme, path)
self.file_count += 1
files_element = self.dan_etree.find('FILES')
file_spec_element = ET.SubElement(files_element, 'FILE_SPEC')
url_element = ET.SubElement(file_spec_element, 'URL')
url_element.text = url
if filetype:
type_element = ET.SubElement(file_spec_element, 'TYPE')
type_element.text = filetype
size_element = ET.SubElement(file_spec_element, 'SIZE')
size_element.text = str(os.path.getsize(path))
crc32_element = ET.SubElement(file_spec_element, 'CHECKSUM')
crc32_element.text = CRC32_from_file(path)


def path_to_url(scheme, path):
if scheme == 'sftp':
url = 'sftp:' + path
elif scheme == 'https':
url = 'https://' + path
else:
url = 'file:' + path
return url


def url_to_path(url):
return url.split(':', 1)[-1][2:]


#class dan_manager(Logger):
# #DAN_RECEIVE_DIR = os.path.abspath(spss_sys_util.resolver('DAN_RECEIVE_DIR'))
#
# def __init__(self):
# self.destination = ''