"""Manage pre-computed PDR models"""
import itertools
import collections
from copy import deepcopy
import numpy as np
from astropy.table import Table, Column, unique, vstack
import astropy.units as u
from .pdrutils import get_table,model_dir, _OBS_UNIT_
from .measurement import Measurement
[docs]class ModelSet(object):
"""Class for computed PDR Model Sets. :class:`ModelSet` provides interface to a directory containing the model FITS files and the ability to query details about.
:param name: identifying name, e.g., 'wk2006'
:type name: str
:param z: metallicity in solar units.
:type z: float
:param medium: medium type, e.g. 'constant density', 'clumpy', 'non-clumpy'
:type medium: str
:param mass: maximum clump mass (for KosmaTau models). Default:None (appropriate for Wolfire/Kaufman models)
:type float:
:params modelsetinfo: For adding user specified ModelSet, this parameter specifies the file with information about the ModelSet. It can be a pathname to an externalt tabular file in an astropy-recognized or an astropy Table object. If an external tabular file, its format should be give by the `format` keyword. The columns are:
['PDR code','name', 'version','path','filename','medium','z','mass','description']
`z` and `mass` should be floats, the rest should be strings.
The 'name`, `version`, `medium`, `z`, and `mass` columns contain the values available in the input ModelSet as described above. `PDR code` is the originator of the code e.g., "KOSMA-tau", `version` is the code version, `path` is the full-qualified pathname to the FITS files, `filename` is the tabular file which contains descriptions of individual FITS files. This must also be in the format specified by the `format` keyword. `description` is a description of the input ModelSet.
:type modelsetinfo: str or :class:~astropy.Table
:format: If `modelsetinfo` is a file, give the format of file. Must be `an astropy recognized Table format. <https://docs.astropy.org/en/stable/io/unified.html#built-in-readers-writers>`_ e.g., ascii, ipac, votable. Default is `IPAC format <https://docs.astropy.org/en/stable/api/astropy.io.ascii.Ipac.html#astropy.io.ascii.Ipac>`_
:type format: str
:raises ValueError: If model set not recognized/found.
"""
#@ToDo replace with kwargs?
def __init__(self,name,z,medium="constant density",mass=None,modelsetinfo=None,format='ipac'):
if modelsetinfo is None:
#get the package default
self._all_models = get_table("all_models.tab")
elif type(modelsetinfo) is str:
self._all_models = Table.read(modelsetinfo,format=format)
else: # must be an Astropy Table
self._all_models = deepcopy(modelsetinfo)
self._all_models.add_index("name")
possible = dict()
if name not in self._all_models["name"]:
raise ValueError(f'Unrecognized model {name:s}. Choices are: {list(self._all_models["name"])}')
if np.all(self._all_models.loc[name]["mass"].mask):
matching_rows = np.where((self._all_models["z"]==z) &
(self._all_models["medium"]==medium))
possible["mass"] = None
else:
matching_rows = np.where((self._all_models["z"]==z) &
(self._all_models["medium"]==medium) &
(self._all_models["mass"] == mass))
possible["mass"] = self._all_models.loc[name]["mass"]
for key in ["z", "medium"]:
possible[key]= self._all_models.loc[name][key]
# ugh, possible[] resulting from above can be a Python native or a Column.
# If only one row matches it will be a native, otherwise it will be a Column,
# so we have to check if it is a Column or not, so that we can successfully
# import numberscreate a numpy array.
for i in possible:
if possible[i] is None:
continue
if isinstance(possible[i],Column):
# convert Column to np.array
possible[i] = sorted(set(np.array(possible[i])))
else:
# convert native to np.array
possible[i] = sorted(set(np.array([possible[i]])))
#print("possible:",possible)
if mass is None and possible['mass'] is not None:
raise ValueError(f'mass value is required for model {name:s}. Allowed values are {possible["mass"]}')
if matching_rows[0].size == 0:
msg = f"Requested ModelSet not found in {name:s}. Check your input values. Allowed z are {possible['z']}. Allowed medium are {possible['medium']}."
if possible['mass'] is not None:
msg = msg + f" Allowed mass are {possible['mass']}."
raise ValueError(msg)
self._tabrow = self._all_models[matching_rows].loc[name]
self._table = get_table(path=self._tabrow["path"],filename=self._tabrow["filename"],format=format)
self._table.add_index("ratio")
self._set_identifiers()
self._set_ratios()
self._default_unit = dict()
self._default_unit["ratio"] = u.dimensionless_unscaled
self._default_unit["intensity"] = _OBS_UNIT_
self._default_unit["emissivity"] = "erg / (cm3 ion s)"
self._user_added_models = dict()
@property
def description(self):
"""The description of this model
:rtype: str
"""
return self._tabrow["description"]+", Z=%2.1f"%self.z
@property
def code(self):
"""The PDR code that created this ModelSet, e.g. 'KOSMA-tau'
:rtype: str
"""
return self._tabrow["PDR code"]
@property
def name(self):
"""The name of this ModelSet
:rtype: str
"""
return self._tabrow["name"]
@property
def version(self):
"""The version of this ModelSet
:rtype: str
"""
return self._tabrow["version"]
@property
def medium(self):
"""The medium type of this model, e.g. 'constant density', 'clumpy', 'non-clumpy'
:rtype: str
"""
return self._tabrow["medium"]
@property
def z(self):
"""The metallicity of this ModelSet
:rtype: float
"""
return self.metallicity
@property
def metallicity(self):
"""The metallicity of this ModelSet
:rtype: float
"""
return self._tabrow["z"]
@property
def table(self):
"""The table containing details of the models in this ModelSet.
:rtype: :class:`astropy.table.Table`
"""
return self._table
@property
def identifiers(self):
"""Table of lines and continuum that are included in ratio models of this ModelSet. Only lines and continuum that are part of ratios are included in this list. For a separate list of line and continuum intensity models see :meth:`~pdrtpy.modelset.ModelSet.supported_intensities`.
:rtype: :class:`astropy.table.Table`
"""
return self._identifiers
@property
def supported_intensities(self):
"""Table of lines and continuum that are covered by this ModelSet and have models separate from
the any ratio model they might be in.
:rtype: :class:`astropy.table.Table`
"""
return self._supported_lines
@property
def supported_ratios(self):
"""The emission ratios that are covered by this ModelSet
:rtype: :class:`astropy.table.Table`
"""
return self._supported_ratios
@property
def user_added_models(self):
"""Show which models have been added to this ModelSet by the user
:rtype: list
"""
return list(self._user_added_models.keys())
[docs] def ratiocount(self,m):
"""The number of valid ratios in this ModelSet, given a list of observation (:class:`~pdrtpy.measurement.Measurement`) identifiers.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: The number of model ratios found for the given list of measurement IDs
:rtype: int
"""
return len(self._get_ratio_elements(m))
[docs] def find_pairs(self,m):
"""Find the valid model ratios labels in this ModelSet for a given list of measurement IDs
:param m: list of string `Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: An iterator of model ratios labels for the given list of measurement IDs
:rtype: iterator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)) :
raise Exception("m must be an array of strings")
for q in itertools.product(m,m):
if q[0] == "FIR" and (q[1] == "OI_145" or q[1] == "OI_63") and "CII_158" in m:
s = q[1] + "+CII_158/" + q[0]
else:
s = q[0]+"/"+q[1]
if s in self.table["ratio"]:
yield s
[docs] def find_files(self,m,ext="fits"):
"""Find the valid model ratios files in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:param ext: file extension. Default: "fits"
:type ext: str
:returns: An iterator of model ratio files for the given list of Measurement IDs
:rtype: iterator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)):
raise Exception("m must be an array of strings")
for q in itertools.product(m,m):
# must deal with OI+CII/FIR models. Note we must check for FIR first, since
# if you check q has OI,CII and m has FIR order you'll miss OI/CII.
if q[0] == "FIR" and (q[1] == "OI_145" or q[1] == "OI_63") and "CII_158" in m:
#print("SPECIAL doing %s %s"%(q[0],q[1]))
s = q[1] + "+CII_158/" + q[0]
else:
#print("doing %s %s"%(q[0],q[1]))
s = q[0]+"/"+q[1]
if s in self.table["ratio"]:
fullpath = self._tabrow["path"]+self.table.loc[s]["filename"]+"."+ext
tup = (s,fullpath)
yield tup
[docs] def model_ratios(self,m):
'''Return the model ratios that match the input Measurement ID list. You must provide at least 2 Measurements IDs
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","FIR"]
:type m: list
:returns: list of string identifiers of ratios IDs, e.g., ['OI_145/CII_158', 'OI_145+CII_158/FIR']
:rtype: list
'''
ratios = list()
if len(m) < 2:
raise Exception("m most contain at least two strings")
for p in self.find_files(m):
ratios.append(p[0])
return ratios
[docs] def model_intensities(self,m):
'''Return the model intensities in this ModelSet that match the input Measurement ID list.
This method will return the intersection of the input list and the list of supported lines.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","CS_21"]
:type m: list
:returns: list of string identifiers of ratios IDs, e.g., ['CII_158','OI_145']
:rtype: list
'''
# get intersection of input list and supported lines
return list(set(m) & set(self._supported_lines["intensity label"]))
[docs] def get_model(self,identifier,unit=None,ext="fits"):
'''Get a specific model by its identifier
:param identifier: a :class:`~pdrtpy.measurement.Measurement` ID. It can be an intensity or a ratio, e.g., "CII_158","CI_609/FIR".
:type identifier: str
:returns: The model matching the identifier
:rtype: :class:`~pdrtpy.measurement.Measurement`
:raises: KeyError if identifier not found in this ModelSet
'''
if identifier in self._user_added_models:
return self._user_added_models[identifier]
if identifier not in self.table["ratio"]:
raise KeyError(f"{identifier} is not in this ModelSet")
_filename = self.table.loc[identifier]["filename"] +"."+ext
d = model_dir()
_thefile = d+self._tabrow["path"] + _filename
_title = self._table.loc[identifier]['title']
# @TODO Fix this: see issues 66 & 67
if unit is None or unit == "":
if identifier == "TS":
#print("Setting unit to K")
unit = "K"
modeltype = "intensity" #??'temperature'
# make a guess at the unit
elif '/' in identifier:
unit = self._default_unit["ratio"]
modeltype = "ratio"
else:
unit = self._default_unit['intensity']
modeltype = "intensity" #this is wrong for emissivity modeltypes
else:
if unit == u.dimensionless_unscaled:
modeltype = "ratio"
else:
modeltype = "intensity"
#print("Unit = ",unit)
_model = Measurement.read(_thefile,title=_title,unit=unit,identifier=identifier)
#if _model.unit=="":
# _model.unit = u.Unit("adu")#self._default_unit["ratio"]
_wcs = _model.wcs
if "MODELTYP" not in _model.header:
_model.header["MODELTYP"] = modeltype
_model.modeltype = modeltype
#@todo this is messy. clean up by doing if wcs.. first?
if self.is_wk2006 or self.name == "smc":
# fix WK2006 model headerslisthd
if _wcs.wcs.cunit[0] == "":
_model.header["CUNIT1"] = "cm^-3"
_wcs.wcs.cunit[0] = u.Unit("cm^-3")
else:
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
if _wcs.wcs.cunit[1] == "":
_model.header["CUNIT2"] = "Habing"
# Raises UnitScaleError:
# "The FITS unit format is not able to represent scales that are not powers of 10. Multiply your data by 1.600000e-03."
# This causes all sorts of downstream problems. Workaround in LineRatioFit.read_models().
#_model.wcs.wcs.cunit[1] = habing_unit
elif self.code == "KOSMA-tau":
# fix KosmaTau model headers
if _wcs.wcs.cunit[0] == "":
_model.header["CUNIT1"] = "cm^-3"
_wcs.wcs.cunit[0] = u.Unit("cm^-3")
else:
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
if _wcs.wcs.cunit[1] == "":
_model.header["CUNIT2"] = "Draine"
else:
_model.header["CUNIT2"] = str(_wcs.wcs.cunit[1])
else:
# copy wcs cunit to header. used later.
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
_model.header["CUNIT2"] = str(_wcs.wcs.cunit[1])
return _model
[docs] def get_models(self,identifiers,model_type="ratio",ext="fits"):
'''get the models from thie ModelSet that match the input list of identifiers
:param identifiers: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","CS_21"]
:type identifiers: list
:param model_type: indicates which type of model is requested one of 'ratio' or 'intensity'
:type model_type: str
:returns: The matching models as a list of :class:`~pdrtpy.measurement.Measurement`.
:rtype: list
:raises: KeyError if identifiers not found in this ModelSet
'''
#if identifier not in self._identifiers["ID"]:
# raise Exception("There is no model in ModelSet %s with the identifier %s"%(self.name,identifier))
if model_type != "intensity" and model_type != "ratio" and model_type != "both":
raise ValueError("Unrecognized model_type: must be one of 'intensity', 'ratio', or 'both'")
models=dict()
a = list()
self._table.remove_indices('ratio')
self._table.add_index('ratio')
if model_type == "intensity" or model_type == "both":
a.extend(self.model_intensities(identifiers))
if model_type == "ratio" or model_type == "both":
a.extend(self.model_ratios(identifiers))
if model_type == "intensity" or model_type == "ratio":
_unit = self._default_unit[model_type]
else:
_unit = None
for k in a:
if k == "TS":
#kluge. we need to support model_type = "temperature"
models[k] = self.get_model(k,unit="K",ext=ext)
else:
models[k] = self.get_model(k,unit=_unit,ext=ext)
return models
[docs] def add_model(self,identifier,model,title,overwrite=False):
r"""Add your own model to this ModelSet.
:param identifier: a :class:`~pdrtpy.measurement.Measurement` ID. It can be an intensity or a ratio, e.g., "CII_158","CI_609/FIR".
:type identifier: str
:param model: the model to add. If a string, this must be the fully-qualified path of a FITS file. If a :class:`~pdrtpy.measurement.Measurement` it must have the same CTYPEs and CUNITs as the models in the ModelSet(?).
:type model: str or :class:`~pdrtpy.measurement.Measurement`
:param title: A formatted string (e.g., LaTeX) describing this observation that can be used for plotting. Python r-strings are accepted, e.g., r'$^{13}$CO(3-2)' would give :math:`^{13}{\rm CO(3-2)}`.
:type title: str
:param overwrite: Whether to overwrite the model if the identifier already exists in the ModelSet or has been previously added. Default: False
:type overwrite: bool
"""
if identifier not in self.table["ratio"] and identifier not in self._user_added_models:
self._really_add_model(identifier,model,title)
elif identifier in self._user_added_models and not overwrite:
raise Exception(f"{identifier} was previously added to this ModelSet. If you wish to overwrite it, use overwrite=True")
elif identifier in self.table["ratio"] and not overwrite:
raise Exception(f"{identifier} is already in the {self.name} ModelSet. If you wish to overwrite it, use overwrite=True")
else:
#print(f"Overwriting {identifier}.")
self._really_add_model(identifier,model,title)
def _really_add_model(self,identifier,model,title):
print("Adding user model %s"%identifier)
if type(model) is str:
m = Measurement.read(model,identifier=identifier)
else:
m = model
m._title = title
self._user_added_models[identifier] = m
if "/" in identifier: # it's a ratio
if identifier in self._supported_ratios["ratio label"]:
# ack, there should be a way just to replace title but I can't get Table.loc to work.
index = np.where(self._supported_ratios["ratio label"] == identifier)[0][0]
self._supported_ratios.remove_row(index)
self._supported_ratios.add_row([title,identifier])
numerator, denominator = identifier.split('/')
fakefilename = "user-"+numerator.replace("_","")+"_"+denominator.replace("_","")
else:
if identifier in self._supported_lines["intensity label"]:
index = np.where(self._supported_lines["intensity label"] == identifier)[0][0]
self._supported_lines.remove_row(index)
self._supported_lines.add_row([title,identifier])
numerator = identifier
denominator = 1
fakefilename = "user-"+numerator.replace("_","")
#numerator denominator ratio filename z title
self.table.add_row([numerator, denominator, identifier, fakefilename, self.z, title])
def _find_ratio_elements(self,m):
# TODO handle case of OI+CII/FIR so it is not special cased in lineratiofit.py
"""Find the valid model numerator,denominator pairs in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: iterator -- An dictionary iterator where key is numerator and value is denominator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)) :
raise Exception("m must be an array of strings")
for q in itertools.product(m,m):
s = q[0]+"/"+q[1]
z = dict()
if s in self.table["ratio"]:
z={"numerator":self.table.loc[s]["numerator"],
"denominator":self.table.loc[s]["denominator"]}
yield z
def _get_ratio_elements(self,m):
"""Get the valid model numerator,denominator pairs in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: dict -- A dictionary where key is numerator and value is denominator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)) :
raise Exception("m must be an array of strings")
k = list()
for q in itertools.product(m,m):
s = q[0]+"/"+q[1]
if s in self.table["ratio"]:
z={"numerator":self.table.loc[s]["numerator"],
"denominator":self.table.loc[s]["denominator"]}
k.append(z)
self._get_oi_cii_fir(m,k)
return k
def _get_oi_cii_fir(self,m,k):
"""Utility method for determining ratio elements, to handle special
case of ([O I] 63 micron + [C II] 158 micron)/I_FIR.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:param k: list of existing ratios to append to
:type k: list
"""
if "CII_158" in m and "FIR" in m:
if "OI_63" in m:
num = "OI_63+CII_158"
den = "FIR"
#lab="OI_63+CII_158/FIR"
z = {"numerator":num,"denominator":den}
k.append(z)
if "OI_145" in m:
num = "OI_145+CII_158"
den = "FIR"
#lab="OI_145+CII_158/FIR"
z = {"numerator":num,"denominator":den}
k.append(z)
def _set_ratios(self):
"""make a useful table of ratios covered by this model"""
self._supported_ratios = Table( [ self.table["title"], self.table["denominator"], self.table["ratio"] ],copy=True)
matching_rows = np.where(self._supported_ratios["denominator"]=="1")[0]
self._supported_lines = Table(self._supported_ratios[matching_rows],copy=True)
self._supported_lines.remove_column("denominator")
self._supported_ratios.remove_rows(matching_rows)
self._supported_ratios['title'].unit = None
self._supported_ratios['ratio'].unit = None
self._supported_ratios.remove_column("denominator")
self._supported_ratios.rename_column("ratio","ratio label")
self._supported_lines.rename_column("ratio","intensity label")
def _set_identifiers(self):
"""make a useful table of identifiers of lines covered by ratios in this ModelSet"""
# remove the single line intensity models from the list.
matching_rows = np.where((self._table['denominator'] != "1"))[0]
n=deepcopy(self._table['numerator'][matching_rows])
n.name = 'ID'
d=deepcopy(self._table['denominator'][matching_rows])
d.name='ID'
t1 = Table([self._table['title'][matching_rows],n],copy=True)
# discard the summed fluxes as user would input them individually
for id in ['OI_145+CII_158','OI_63+CII_158']:
a = np.where(t1['ID']==id)[0]
for z in a:
t1.remove_row(z)
# now remove denominator from title (everything from / onwards)
for i in range(len(t1['title'])):
if '/' in t1['title'][i]:
t1['title'][i] = t1['title'][i][0:t1['title'][i].index('/')]
t2 = Table([self._table['title'][matching_rows],d],copy=True)
# remove numerator from title (everything before and including /)
for i in range(len(t2['title'])):
if '/' in t2['title'][i]:
t2['title'][i] = t2['title'][i][t2['title'][i].index('/')+1:]
t = vstack([t1,t2])
t = unique(t,keys=['ID'],keep='first',silent=True)
t['title'].unit = None
t['ID'].unit = None
t.rename_column('title','canonical name')
self._identifiers = t
@property
def is_wk2006(self):
"""method to indicate this is a wk2006 model, to deal with quirks
of that modelset
:returns: True if it is.
"""
return self.name == "wk2006"
# ============= Static Methods =============
[docs] @staticmethod
def list():
"""Print the names and descriptions of available ModelSets (not just this one) """
ModelSet.all_sets().pprint_all(align="<")
[docs] @staticmethod
def all_sets():
"""Return a table of the names and descriptions of available ModelSets (not just this one)
:rtype: :class:`~astropy.table.Table`
"""
t = get_table("all_models.tab")
t.remove_column("path")
t.remove_column("filename")
return t