"""Manage pre-computed PDR models"""
import collections
import itertools
from copy import deepcopy
import astropy.units as u
import numpy as np
from astropy.table import Column, Table, unique, vstack
from .measurement import Measurement
from .pdrutils import _OBS_UNIT_, get_table, model_dir
[docs]
class ModelSet(object):
"""Class for computed PDR Model Sets. :class:`ModelSet` provides interface to a directory containing the model FITS files and the ability to query details about.
:param name: identifying name, e.g., 'wk2006'
:type name: str
:param z: metallicity in solar units.
:type z: float
:param medium: medium type, e.g. 'constant density', 'clumpy', 'non-clumpy'
:type medium: str
:param losangle: the line-of-sight inclination angle (aka viewing angle), losangle=0 is face-on, losangle=90 is edge-on. Valid values are 0, 30, 45, 60, 75, 90.
:type losangle: float
:param mass: maximum clump mass (for KosmaTau models). Default:None (appropriate for Wolfire/Kaufman models)
:type mass: float
:param modelsetinfo: For adding user specified ModelSet, this parameter
specifies the file with information about the ModelSet. It can be a
pathname to an externalt tabular file in an astropy-recognized or an
astropy Table object. If an external tabular file, its format should
be give by the `format` keyword. The columns are:
.. code-block:: python
['PDRcode','name', 'version','path','filename','medium','z','inc', 'mass','description']
`z`, 'inc', and `mass` should be numbers, the rest should be strings.
The `name`, `version`, `medium`, `z`, 'inc', and `mass` columns contain
the values available in the input ModelSet as described above. `PDR
code` is the originator of the code e.g., "KOSMA-tau", `version`
is the code version, `path` is the full-qualified pathname to
the FITS files, `filename` is the tabular file which contains
descriptions of individual FITS files. This must also be in
the format specified by the `format` keyword. `description`
is a description of the input ModelSet.
:type modelsetinfo: str or :class:`~astropy.table.Table`
:param format: If `modelsetinfo` is a file, give the
format of file. Must be `an astropy recognized Table format.
<https://docs.astropy.org/en/stable/io/unified.html#built-in-readers-writers>`_
e.g., ascii, ipac, votable. Default is `IPAC format
<https://docs.astropy.org/en/stable/api/astropy.io.ascii.Ipac.html#astropy.io.ascii.Ipac>`_
:type format: str
:raises ValueError: If model set not recognized/found.
"""
def __init__(
self,
name,
z,
medium="constant density",
losangle=0,
mass=None,
modelsetinfo=None,
format="ipac",
debug=False,
):
if modelsetinfo is None:
# get the package default
self._all_models = get_table("all_models.tab")
elif type(modelsetinfo) is str:
self._all_models = Table.read(modelsetinfo, format=format)
else: # must be an Astropy Table
self._all_models = deepcopy(modelsetinfo)
self._all_models.add_index("name")
possible = dict()
if name not in self._all_models["name"]:
raise ValueError(
f'Unrecognized PDR model code {name:s}. Choices are: {set(list(self._all_models["name"]))}'
)
if np.all(self._all_models.loc[name]["mass"].mask):
matching_rows = np.where(
(self._all_models["z"] == z)
& (self._all_models["medium"] == medium)
& (self._all_models["losangle"] == losangle)
)
possible["mass"] = None
else: # Kosma-tau
matching_rows = np.where(
(self._all_models["z"] == z)
& (self._all_models["medium"] == medium)
& (self._all_models["mass"] == mass)
)
possible["mass"] = self._all_models.loc[name]["mass"]
for key in ["z", "medium", "losangle"]:
possible[key] = self._all_models.loc[name][key]
# ugh, possible[] resulting from above can be a Python native or a Column.
# If only one row matches it will be a native, otherwise it will be a Column,
# so we have to check if it is a Column or not, so that we can successfully
# import numberscreate a numpy array.
for j in possible:
if possible[j] is None:
continue
if isinstance(possible[j], Column):
# convert Column to np.array
possible[j] = sorted(set(np.array(possible[j])))
else:
# convert native to np.array
possible[j] = sorted(set(np.array([possible[j]])))
if mass is None and possible["mass"] is not None:
raise ValueError(f'mass value is required for model {name:s}. Allowed values are {possible["mass"]}')
if matching_rows[0].size == 0:
msg = (
f"Requested ModelSet not found in {name:s}. Check your input values. Allowed z are {possible['z']}. "
f" Allowed medium are {possible['medium']}. Allowed losangle are {possible['losangle']}."
)
if possible["mass"] is not None:
msg = msg + f" Allowed mass are {possible['mass']}."
raise ValueError(msg)
self._tabrow = self._all_models[matching_rows].loc[name]
tpath = self._tabrow["path"]
self._table = get_table(path=tpath, filename=self._tabrow["filename"], format=format)
self._table.add_index("ratio")
self._set_identifiers()
self._set_ratios()
self._default_unit = dict()
self._default_unit["ratio"] = u.dimensionless_unscaled
self._default_unit["intensity"] = _OBS_UNIT_
self._default_unit["emissivity"] = "erg / (cm3 ion s)"
self._user_added_models = dict()
@property
def description(self):
"""The description of this model
:rtype: str
"""
s = f", Z={self.z:2.1f}, losangle={self.losangle:d}"
if self.avlos is not None:
s+=f", avlos={self.avlos:d}"
return self._tabrow["description"] + s
@property
def code(self):
"""The PDR code that created this ModelSet, e.g. 'KOSMA-tau'
:rtype: str
"""
return self._tabrow["PDRcode"]
@property
def name(self):
"""The name of this ModelSet
:rtype: str
"""
return self._tabrow["name"]
@property
def version(self):
"""The version of this ModelSet
:rtype: str
"""
return self._tabrow["version"]
@property
def medium(self):
"""The medium type of this model, e.g. 'constant density', 'clumpy', 'non-clumpy'
:rtype: str
"""
return self._tabrow["medium"]
@property
def z(self):
"""The metallicity of this ModelSet
:rtype: float
"""
return self.metallicity
@property
def metallicity(self):
"""The metallicity of this ModelSet
:rtype: float
"""
return self._tabrow["z"]
@property
def losangle(self):
"""The inclination (viewing) angle with respect to the line of sight, in degrees. A value of 0 means face-on.
:rtype: float
"""
return self._tabrow["losangle"]
@property
def viewangle(self):
"""The viewing angle (inclination) with respect to the line of sight, in degrees. A value of 0 means face-on.
:rtype: float
"""
return self.losangle
@property
def avperp(self):
"""The PDR thickness for inclined viewing angle models, expressed in visual magnitudes. A value of 0 indicates a face-on model, for which `avperp` is undefined.
:rtype: float
"""
pass
@property
def avlos(self): # @todo this might be model specific
"""The visual extinction along the line of sight, express in magnitudes.
:rtype: float
"""
pass
@property
def table(self):
"""The table containing details of the models in this ModelSet.
:rtype: :class:`astropy.table.Table`
"""
return self._table
@property
def identifiers(self):
"""Table of lines and continuum that are included in ratio models of this ModelSet. Only lines and continuum that are part of ratios are included in this list. For a separate list of line and continuum intensity models see :meth:`~pdrtpy.modelset.ModelSet.supported_intensities`.
:rtype: :class:`astropy.table.Table`
"""
return self._identifiers
@property
def supported_intensities(self):
"""Table of lines and continuum that are covered by this ModelSet and have models separate from
the any ratio model they might be in.
:rtype: :class:`astropy.table.Table`
"""
return self._supported_lines
@property
def supported_ratios(self):
"""The emission ratios that are covered by this ModelSet
:rtype: :class:`astropy.table.Table`
"""
return self._supported_ratios
@property
def user_added_models(self):
"""Show which models have been added to this ModelSet by the user
:rtype: list
"""
return list(self._user_added_models.keys())
[docs]
def ratiocount(self, m):
"""The number of valid ratios in this ModelSet, given a list of observation (:class:`~pdrtpy.measurement.Measurement`) identifiers.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: The number of model ratios found for the given list of measurement IDs
:rtype: int
"""
return len(self._get_ratio_elements(m))
[docs]
def find_pairs(self, m):
"""Find the valid model ratios labels in this ModelSet for a given list of measurement IDs
:param m: list of string `Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: An iterator of model ratios labels for the given list of measurement IDs
:rtype: iterator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)):
raise Exception("m must be an array of strings")
for q in itertools.product(m, m):
if q[0] == "FIR" and (q[1] == "OI_145" or q[1] == "OI_63") and "CII_158" in m:
s = q[1] + "+CII_158/" + q[0]
else:
s = q[0] + "/" + q[1]
if s in self.table["ratio"]:
yield s
[docs]
def find_files(self, m, ext="fits"):
"""Find the valid model ratios files in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:param ext: file extension. Default: "fits"
:type ext: str
:returns: An iterator of model ratio files for the given list of Measurement IDs
:rtype: iterator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)):
raise Exception("m must be an array of strings")
for q in itertools.product(m, m):
# must deal with OI+CII/FIR models. Note we must check for FIR first, since
# if you check q has OI,CII and m has FIR order you'll miss OI/CII.
if q[0] == "FIR" and (q[1] == "OI_145" or q[1] == "OI_63") and "CII_158" in m:
s = q[1] + "+CII_158/" + q[0]
else:
s = q[0] + "/" + q[1]
if s in self.table["ratio"]:
fullpath = self._tabrow["path"] + self.table.loc[s]["filename"] + "." + ext
tup = (s, fullpath)
yield tup
[docs]
def model_ratios(self, m):
"""Return the model ratios that match the input Measurement ID list. You must provide at least 2 Measurements IDs
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","FIR"]
:type m: list
:returns: list of string identifiers of ratios IDs, e.g., ['OI_145/CII_158', 'OI_145+CII_158/FIR']
:rtype: list
"""
ratios = list()
if len(m) < 2:
raise Exception("m most contain at least two strings")
for p in self.find_files(m):
ratios.append(p[0])
return ratios
[docs]
def model_intensities(self, m):
"""Return the model intensities in this ModelSet that match the input Measurement ID list.
This method will return the intersection of the input list and the list of supported lines.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","CS_21"]
:type m: list
:returns: list of string identifiers of ratios IDs, e.g., ['CII_158','OI_145']
:rtype: list
"""
# get intersection of input list and supported lines
return list(set(m) & set(self._supported_lines["intensity label"]))
[docs]
def get_model(self, identifier, unit=None, ext="fits"):
"""Get a specific model by its identifier
:param identifier: a :class:`~pdrtpy.measurement.Measurement` ID. It can be an intensity or a ratio, e.g., "CII_158","CI_609/FIR".
:type identifier: str
:returns: The model matching the identifier
:rtype: :class:`~pdrtpy.measurement.Measurement`
:raises KeyError: if identifier not found in this ModelSet
"""
if identifier in self._user_added_models:
return self._user_added_models[identifier]
if identifier not in self.table["ratio"]:
raise KeyError(f"{identifier} is not in this ModelSet")
_filename = self.table.loc[identifier]["filename"] + "." + ext
d = model_dir()
_thefile = d + self._tabrow["path"] + _filename
_title = self._table.loc[identifier]["title"]
# @TODO Fix this: see issues 66 & 67
if unit is None or unit == "":
if identifier == "TS":
unit = "K"
modeltype = "intensity" # ??'temperature'
# make a guess at the unit
elif "/" in identifier:
unit = self._default_unit["ratio"]
modeltype = "ratio"
else:
unit = self._default_unit["intensity"]
modeltype = "intensity" # this is wrong for emissivity modeltypes
else:
if unit == u.dimensionless_unscaled:
modeltype = "ratio"
else:
modeltype = "intensity"
_model = Measurement.read(_thefile, title=_title, unit=unit, identifier=identifier)
# if _model.unit=="":
# _model.unit = u.Unit("adu")#self._default_unit["ratio"]
_wcs = _model.wcs
if "MODELTYP" not in _model.header:
_model.header["MODELTYP"] = modeltype
_model.modeltype = modeltype
# @todo this is messy. clean up by doing if wcs.. first?
if self.is_wk2006 or self.name == "smc":
# fix WK2006 model headerslisthd
if _wcs.wcs.cunit[0] == "":
_model.header["CUNIT1"] = "cm^-3"
_wcs.wcs.cunit[0] = u.Unit("cm^-3")
else:
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
if _wcs.wcs.cunit[1] == "":
_model.header["CUNIT2"] = "Habing"
# Raises UnitScaleError:
# "The FITS unit format is not able to represent scales that are not powers of 10. Multiply your data by 1.600000e-03."
# This causes all sorts of downstream problems. Workaround in LineRatioFit.read_models().
# _model.wcs.wcs.cunit[1] = habing_unit
elif self.code == "KOSMA-tau":
# fix KosmaTau model headers
if _wcs.wcs.cunit[0] == "":
_model.header["CUNIT1"] = "cm^-3"
_wcs.wcs.cunit[0] = u.Unit("cm^-3")
else:
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
if _wcs.wcs.cunit[1] == "":
_model.header["CUNIT2"] = "Draine"
else:
_model.header["CUNIT2"] = str(_wcs.wcs.cunit[1])
else:
# copy wcs cunit to header. used later.
_model.header["CUNIT1"] = str(_wcs.wcs.cunit[0])
_model.header["CUNIT2"] = str(_wcs.wcs.cunit[1])
return _model
[docs]
def get_models(self, identifiers, model_type="ratio", ext="fits"):
"""get the models from thie ModelSet that match the input list of identifiers
:param identifiers: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g., ["CII_158","OI_145","CS_21"]
:type identifiers: list
:param model_type: indicates which type of model is requested one of 'ratio' or 'intensity'
:type model_type: str
:returns: The matching models as a list of :class:`~pdrtpy.measurement.Measurement`.
:rtype: list
:raises KeyError: if identifiers not found in this ModelSet
"""
# if identifier not in self._identifiers["ID"]:
# raise Exception("There is no model in ModelSet %s with the identifier %s"%(self.name,identifier))
if model_type != "intensity" and model_type != "ratio" and model_type != "both":
raise ValueError("Unrecognized model_type: must be one of 'intensity', 'ratio', or 'both'")
models = dict()
a = list()
self._table.remove_indices("ratio")
self._table.add_index("ratio")
if model_type == "intensity" or model_type == "both":
a.extend(self.model_intensities(identifiers))
if model_type == "ratio" or model_type == "both":
a.extend(self.model_ratios(identifiers))
if model_type == "intensity" or model_type == "ratio":
_unit = self._default_unit[model_type]
else:
_unit = None
for k in a:
if k == "TS":
# kluge. we need to support model_type = "temperature"
models[k] = self.get_model(k, unit="K", ext=ext)
else:
models[k] = self.get_model(k, unit=_unit, ext=ext)
return models
[docs]
def add_model(self, identifier, model, title, overwrite=False):
r"""Add your own model to this ModelSet.
:param identifier: a :class:`~pdrtpy.measurement.Measurement` ID. It can be an intensity or a ratio, e.g., "CII_158","CI_609/FIR".
:type identifier: str
:param model: the model to add. If a string, this must be the fully-qualified path of a FITS file. If a :class:`~pdrtpy.measurement.Measurement` it must have the same CTYPEs and CUNITs as the models in the ModelSet(?).
:type model: str or :class:`~pdrtpy.measurement.Measurement`
:param title: A formatted string (e.g., LaTeX) describing this observation that can be used for plotting. Python r-strings are accepted, e.g., r'$^{13}$CO(3-2)' would give :math:`^{13}{\rm CO(3-2)}`.
:type title: str
:param overwrite: Whether to overwrite the model if the identifier already exists in the ModelSet or has been previously added. Default: False
:type overwrite: bool
"""
if identifier not in self.table["ratio"] and identifier not in self._user_added_models:
self._really_add_model(identifier, model, title)
elif identifier in self._user_added_models and not overwrite:
raise Exception(
f"{identifier} was previously added to this ModelSet. If you wish to overwrite it, use overwrite=True"
)
elif identifier in self.table["ratio"] and not overwrite:
raise Exception(
f"{identifier} is already in the {self.name} ModelSet. If you wish to overwrite it, use overwrite=True"
)
else:
# print(f"Overwriting {identifier}.")
self._really_add_model(identifier, model, title)
def _really_add_model(self, identifier, model, title):
print("Adding user model %s" % identifier)
if type(model) is str:
m = Measurement.read(model, identifier=identifier)
else:
m = model
m._title = title
self._user_added_models[identifier] = m
if "/" in identifier: # it's a ratio
if identifier in self._supported_ratios["ratio label"]:
# ack, there should be a way just to replace title but I can't get Table.loc to work.
index = np.where(self._supported_ratios["ratio label"] == identifier)[0][0]
self._supported_ratios.remove_row(index)
self._supported_ratios.add_row([title, identifier])
numerator, denominator = identifier.split("/")
fakefilename = "user-" + numerator.replace("_", "") + "_" + denominator.replace("_", "")
else:
if identifier in self._supported_lines["intensity label"]:
index = np.where(self._supported_lines["intensity label"] == identifier)[0][0]
self._supported_lines.remove_row(index)
self._supported_lines.add_row([title, identifier])
numerator = identifier
denominator = 1
fakefilename = "user-" + numerator.replace("_", "")
# numerator denominator ratio filename z title
self.table.add_row([numerator, denominator, identifier, fakefilename, self.z, title, str(self.losangle)])
def _find_ratio_elements(self, m):
# TODO handle case of OI+CII/FIR so it is not special cased in lineratiofit.py
"""Find the valid model numerator,denominator pairs in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: iterator -- An dictionary iterator where key is numerator and value is denominator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)):
raise Exception("m must be an array of strings")
for q in itertools.product(m, m):
s = q[0] + "/" + q[1]
z = dict()
if s in self.table["ratio"]:
z = {"numerator": self.table.loc[s]["numerator"], "denominator": self.table.loc[s]["denominator"]}
yield z
def _get_ratio_elements(self, m):
"""Get the valid model numerator,denominator pairs in this ModelSet for a given list of measurement IDs. See :meth:`~pdrtpy.measurement.Measurement.id`
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:returns: dict -- A dictionary where key is numerator and value is denominator
"""
if not isinstance(m, collections.abc.Iterable) or isinstance(m, (str, bytes)):
raise Exception("m must be an array of strings")
k = list()
for q in itertools.product(m, m):
s = q[0] + "/" + q[1]
if s in self.table["ratio"]:
z = {"numerator": self.table.loc[s]["numerator"], "denominator": self.table.loc[s]["denominator"]}
k.append(z)
self._get_oi_cii_fir(m, k)
return k
def _get_oi_cii_fir(self, m, k):
"""Utility method for determining ratio elements, to handle special
case of ([O I] 63 micron + [C II] 158 micron)/I_FIR.
:param m: list of string :class:`~pdrtpy.measurement.Measurement` IDs, e.g. ["CII_158","OI_145","FIR"]
:type m: list
:param k: list of existing ratios to append to
:type k: list
"""
if "CII_158" in m and "FIR" in m:
if "OI_63" in m:
num = "OI_63+CII_158"
den = "FIR"
# lab="OI_63+CII_158/FIR"
z = {"numerator": num, "denominator": den}
k.append(z)
if "OI_145" in m:
num = "OI_145+CII_158"
den = "FIR"
# lab="OI_145+CII_158/FIR"
z = {"numerator": num, "denominator": den}
k.append(z)
def _set_ratios(self):
"""make a useful table of ratios covered by this model"""
self._supported_ratios = Table([self.table["title"], self.table["denominator"], self.table["ratio"]], copy=True)
matching_rows = np.where(self._supported_ratios["denominator"] == "1")[0]
self._supported_lines = Table(self._supported_ratios[matching_rows], copy=True)
self._supported_lines.remove_column("denominator")
self._supported_ratios.remove_rows(matching_rows)
self._supported_ratios["title"].unit = None
self._supported_ratios["ratio"].unit = None
self._supported_ratios.remove_column("denominator")
self._supported_ratios.rename_column("ratio", "ratio label")
self._supported_lines.rename_column("ratio", "intensity label")
def _set_identifiers(self):
"""make a useful table of identifiers of lines covered by ratios in this ModelSet"""
# remove the single line intensity models from the list.
matching_rows = np.where((self._table["denominator"] != "1"))[0]
n = deepcopy(self._table["numerator"][matching_rows])
n.name = "ID"
d = deepcopy(self._table["denominator"][matching_rows])
d.name = "ID"
t1 = Table([self._table["title"][matching_rows], n], copy=True)
# discard the summed fluxes as user would input them individually
for id in ["OI_145+CII_158", "OI_63+CII_158"]:
a = np.where(t1["ID"] == id)[0]
for z in a:
t1.remove_row(z)
# now remove denominator from title (everything from / onwards)
for i in range(len(t1["title"])):
if "/" in t1["title"][i]:
t1["title"][i] = t1["title"][i][0 : t1["title"][i].index("/")]
t2 = Table([self._table["title"][matching_rows], d], copy=True)
# remove numerator from title (everything before and including /)
for i in range(len(t2["title"])):
if "/" in t2["title"][i]:
t2["title"][i] = t2["title"][i][t2["title"][i].index("/") + 1 :]
t = vstack([t1, t2])
t = unique(t, keys=["ID"], keep="first", silent=True)
t["title"].unit = None
t["ID"].unit = None
t.rename_column("title", "canonical name")
self._identifiers = t
@property
def is_wk2006(self):
"""method to indicate this is a wk2006 model, to deal with quirks
of that modelset
:returns: True if it is.
"""
return self.name == "wk2006"
@property
def is_wk2020(self):
"""method to indicate this is a wk2020 model, to deal with quirks
of that modelset
:returns: True if it is.
"""
return self.name == "wk2020"
# ============= Static Methods =============
[docs]
@staticmethod
def list():
"""Print the names and descriptions of available ModelSets (not just this one)"""
ModelSet.all_sets().pprint_all(align="<")
[docs]
@staticmethod
def all_sets(debug=False):
"""Return a table of the names and descriptions of available ModelSets (not just this one)
:rtype: :class:`~astropy.table.Table`
"""
t = get_table("all_models.tab")
t.remove_column("path")
t.remove_column("filename")
return t