Документ взят из кэша поисковой машины. Адрес оригинального документа : http://www.mrao.cam.ac.uk/~bn204/alma/memo-infer-588/air.py
Дата изменения: Fri Aug 10 20:17:35 2012
Дата индексирования: Tue Oct 2 10:26:14 2012
Кодировка:
"""
Bojan Nikolic ,
Initial version December 2008
Revised 2009

The driver module for all libAIR functionality and retrieval of
atmospheric parameters.

Do not put into this file:
- Testing routines
- Input/Output or plotting
"""
import math

import numpy

import setup

setup.pyair_minV("0.9")
import pyair

setup.pybnmin1_minV("1.4")
import pybnmin1

from bnmin1utils import SetIC, ChainToArray

MK_weak_pr=( ("n", 0, 10),
("T", 200, 320),
("P", 100, 650))

MK_flat_pr=( ("n", 0, 5),
("T", 260, 280),
("P", 530, 610))

MK_tight_pr=( ("n", 0, 5),
("T", 260, 280),
("P", 570, 590))

PdB_weak_pr=( ("n", 0, 10),
("T", 200, 300),
("P", 100, 1000))

PdB_flat_pr=( ("n", 0, 10),
("T", 250, 280),
("P", 500, 750))

PdB_tight_pr=( ("n", 0, 10),
("T", 260, 280),
("P", 680, 710))

PdB_tightT_pr=( ("n", 0, 10),
("T", 268, 274),
("P", 680, 710))

OSF_flat_pr=( ("n", 0, 10),
("T", 250, 285),
("P", 500, 750))

def applyPriors(obsmod,
prior=None,
log_pr=False):
"""
Apply prior information to a model -- supports flat or log-flat
priors

:param obsmod: The base model

:param prior: List of lists defining the prior ranges

:param log_pr: If true, use log-priors

"""
if prior is None:
return obsmod
else:
obsmod.thisown=False
if log_pr :
po=pybnmin1.LogFlatPriors(obsmod)
else:
po=pybnmin1.IndependentFlatPriors(obsmod)
for pname, plow, phigh in prior:
po.AddPrior(pname, plow, phigh)
return po

_radiomap= {"DickeProto" : pyair.ALMADickeProto,
"Prod" : pyair.ALMAProd,
"IRAM22GHz" : pyair.IRAM22GHz }

def mkModel(coupling=None,
TSpill=275,
za=0,
radiom="DickeProto"):
"""
Make a libAIR model based on user inputs

:param za: Zenith angle for the plane-parrelel model
"""
wm=pyair.SingleLayerWater(_radiomap[radiom],
pyair.PartTable,
pyair.AirCont)
wm.thisown=False
if coupling is not None:
wm=pyair.CouplingModel(wm)
wm.setSpill(coupling,
TSpill)
wm.thisown=False
wm=pyair.PPDipModel(wm)
wm.setZA(za)
return wm

def AbsRetrieve(err=1.0,
d=None,
ic=[1.0, 270, 600],
nsample=2000000,
prior=None,
coupling=None,
TSpill=275,
el=None,
PSigma=0.1,
TSigma=0.1,
nsigma=0.001,
sampler="metropolis",
log_pr=True,
**kwargs):
"""
Absolute retrieval from brightness temperatures only. For testing
& comparison to the joint analysis.

:param err: Gaussian error on each measurement

:param d: The observed data. If not supplied (is None) then the
model for the initial condition point (ic) is used. The list is in
[ch1, ch2, ch3, ch4] format.

:param ic: The initial condition to start the chain from

:param nsample: Number of proposals to make

:param coupling: The coupling of the WVR to the sky

:param el: Elevation in radian. If None assume pointing straight
up

:param sampler: Allow different samplers to be run. Experimenatal
only feature for now

"""
if el is None:
za=0
else:
za=math.pi*0.5-el
wm=mkModel(coupling=coupling,
TSpill=TSpill,
za=za,
**kwargs)
ll=pyair.AbsNormMeasure(wm)
ll.thermNoise=pybnmin1.DoubleVector([err]*4)
SetIC(ll, ic)
if d is None:
ll.modelObs()
else:
ll.obs= pybnmin1.DoubleVector(d)
print "Simulated points is ", list(ll.obs)
ll=applyPriors(ll,
prior=prior,
log_pr=log_pr)

if sampler is "metropolis":
mm=pybnmin1.MetropolisMCMC(ll,
[nsigma, TSigma, PSigma],
33)
elif sampler is "nested":
ss=pybnmin1.ListDV()
pybnmin1.startSetDirect(ll,
200,
ss)
mm=pybnmin1.NestedS(ll,
ss,
[nsigma, TSigma, PSigma],
33)

return mm.sample(nsample)
else:
raise "unkown sampler"

if coupling is not None:
mm.getbyname("coupling").dofit=0

return ChainToArray(mm.sample(nsample))

def dipRetrieve(elv,
tskyv,
err=1.0,
ic=[1.0, 270, 600, 0.99, 280],
nsample=200000,
fitTTerm=False,
PSigma=0.1,
TSigma=0.1,
nsigma=0.001,
Csigma=0.005,
prior=None,
radiom="Prod"
):
"""
Retrieve model parameters from a sky-dip measurement

:param err: Random error on each brighntess measured, in Kelvin,
assumed independent and gaussian
"""
wm=mkModel(coupling=ic[3],
TSpill=ic[4],
za=0,
radiom=radiom)
ll=pyair.DipNormMeasure(wm)
SetIC(ll, ic)
for el,tsky in zip(elv,tskyv):
ll.addObs(math.pi/2 - el,
pyair.doubleV(tsky))
ll.thermNoise=pybnmin1.DoubleVector([err]*4)
ll=applyPriors(ll,
prior=prior)
sigmas=[nsigma, TSigma, PSigma, Csigma]
if fitTTerm:
sigmas += [0.1]
mm=pybnmin1.MetropolisMCMC(ll,
sigmas,
33)
if fitTTerm:
mm.getbyname("TTerm").dofit=1
return ChainToArray(mm.sample(nsample))

def pathRetrieve(ic=[1.0, 270, 600],
Terr=1.0,
dTdLerr=0.02,
prior=None,
PSigma=0.1,
TSigma=0.1,
nsigma=0.001,
nsample=200000,
Tb_obs=None,
dT_dL=None,
coupling=None,
TSpill=275,
za=None
):
"""
Retrieval from absolute temperatures and the path/temp correlation

:param Terr: Error on the temperature measurement (K)

:param dTdLerr: Error on the dt/dl measurement as a fraction (unitless)

:param Tb_obs: Actual observed sky brightness instead of the simulated brightness

:param dT_dL: Acutal obsereved coefficients instead of the
simulated coefficients

"""
wm=mkModel(coupling=coupling,
TSpill=TSpill,
za=za)

ll=pyair.PathMeasure(wm)
SetIC(ll, ic)
if Tb_obs is None:
ll.modelObs()
else:
ll.Tb_obs=pybnmin1.DoubleVector(Tb_obs)
ll.dT_dL=pybnmin1.DoubleVector(dT_dL)

ll.Tb_sigma.thermNoise=pybnmin1.DoubleVector([Terr]*4)
if type(dTdLerr) is not list:
ll.dT_dL_sigma.thermNoise=pybnmin1.DoubleVector(numpy.fabs(numpy.array(ll.dT_dL))*dTdLerr)
else:
ll.dT_dL_sigma.thermNoise=pybnmin1.DoubleVector(numpy.fabs(numpy.array(ll.dT_dL))*numpy.array(dTdLerr))

print "Simulated points is ", list(ll.Tb_obs), list(ll.dT_dL)
ll=applyPriors(ll,
prior=prior,
log_pr=True)

mm=pybnmin1.MetropolisMCMC(ll,
[nsigma, TSigma, PSigma],
33)

if coupling is not None:
mm.getbyname("coupling").dofit=0

return ChainToArray(mm.sample(nsample))




def calcdTdL(pchain,
coupling=None,
TSpill=275,
el=math.pi*0.5,
**kwargs):
"""
Calculate dTdL from a chain of model parameters

:param pchain: Chain of model parameter values

:param el: Elevation at which to calculate the coefficients

:returns: Chain of the four dTdL coefficients
"""
wm=mkModel(coupling=coupling,
TSpill=TSpill,
za=math.pi*0.5-el,
**kwargs)
res=[]
for p in pchain:
SetIC(wm,
p)
v=pybnmin1.DoubleVector([0,0,0,0])
wm.dTdL_ND(v)
res.append(list(v))
return numpy.array(res)


def mkMeasureSys(rf,
bw=1.0,
full=False):
"""
Make a measurement system

"""
radio=pyair.MkSSBRadio(rf,bw)
if full:
sl=pyair.ISingleLayerWater(radio.getFGrid(),
pyair.WaterData.LALL)
else:
sl=pyair.ISingleLayerWater(radio.getFGrid(),
pyair.WaterData.L183)
sl.n=1
sl.P=550
sl.T=270
sl.thisown=False
radio.thisown=False
return pyair.MeasureSys(sl,radio), sl


def mkSingleLayerC(fmin,fmax,fstep):
"""
Create a signle layer object with regular grid

Primarily for plotting
"""
fgrid=numpy.arange(fmin,fmax,fstep)
sl=pyair.ISingleLayerWater(pyair.doubleV(fgrid),
pyair.WaterData.LALL)
sl.n=1
sl.P=550
sl.T=270
return sl