"""
Construct a set of observations by assigning fibers to targetsAllocate apertures to targets.
Contains code originally written by 2021 Akamai intern, Brittany Ann
Ramos.
.. include:: ../include/links.rst
"""
import io
import sys
import time
import warnings
from pathlib import Path
from configparser import ConfigParser
from IPython import embed
import numpy
from astropy import table
from .deploy import FOBOSApertures
from .allocate import assign_apertures
from .targets import parse_targets
# TODO:
# - Set fraction to assign to sky
# - Write method that assigns sky fibers, guide bundles, calibration
# bundles
[docs]def report_configurations(n_in_fov, obs_obj, obs_nap, obs_ap, obs_mode):
"""
Construct a report of the observation configurations.
"""
tab = table.Table()
# Number of observations
nobs = len(obs_obj)
# Pointing number
tab['Pointing'] = numpy.arange(nobs)+1
# Number of available apertures in each observation
tab['Avail'] = numpy.array(obs_nap)
# Number of allocations in each observation
tab['Alloc'] = numpy.array([len(o) for o in obs_obj])
# Fractional growth of observed targets
tab['Compl'] = numpy.cumsum(tab['Alloc'])/n_in_fov
tab['Compl'].format = '.4f'
# Fraction of all apertures assigned
tab['Eff'] = tab['Alloc']/tab['Avail']
tab['Eff'].format = '.4f'
# Mean fraction of assigned apertures for this and all previous observations
tab['MeanEff'] = numpy.cumsum(tab['Eff'])/(numpy.arange(nobs)+1)
tab['MeanEff'].format = '.4f'
print(f'Total number of targets available: {n_in_fov}')
print(f'Total number of pointings: {nobs}')
print('')
tab.write(sys.stdout, format='ascii.fixed_width_two_line', delimiter=' ')
[docs]def write_configurations(root, ra, dec, center, obs_obj, obs_ap, obs_mode, objid=None, path=None,
ndig=None, tight=False, target_file=None, ra_c=None, dec_c=None):
"""
Write a set of configuration files for each FOBOS observation.
Args:
root (:obj:`str`):
The root name for all output files.
ra (`numpy.ndarray`_):
Right ascension coordinates for all considered objects.
dec (`numpy.ndarray`_):
Declination coordinates for all considered objects. Shape must
match ``ra``.
center (:obj:`tuple`):
RA and DEC coordinates for the FOBOS pointing center.
obs_obj (:obj:`list`):
List of `numpy.ndarray`_ objects identifying the indices of the
objects observed from the provided list of coordinates. The number
of items in the list sets the number of revisits to the same
pointing. This is the same as the second object returned by
:func:`~producer.plan.configure_observations`.
obs_ap (:obj:`list`):
List of `numpy.ndarray`_ objects identifying the indices of the
FOBOS apertures used for each object observed. List length must
match ``obs_obj``. This is the same as the fourth object returned
by :func:`~producer.plan.configure_observations`. The aperture
indices must match indices when instantiating a
:class:`~producer.deploy.FOBOSApertures` object in the specified
mode (``obs_mode``).
obs_mode (:obj:`list`):
List of `numpy.ndarray`_ objects identifying the FOBOS mode; see
:class:`~producer.deploy.FOBOSApertures`. List length must match
``obs_obj``. This is the same as the last object returned by
:func:`~producer.plan.configure_observations`.
objid (`numpy.ndarray`_, optional):
An array with identifiers for each object. Each array element must
convert directly to a string. Uniqueness is not checked. Shape
must match ``ra``. If None, just set to the 0-indexed array index.
path (:obj:`str`, optional):
Root path for all output files. If None, either set to the parent
path provided by ``root`` or set to the current directory.
ndig (:obj:`int`, optional):
Number of digits to use for the observation number in the output
file names. If None, this is set by the number of configurations to
write. E.g., 9 observations or less yield ``ndig=1``, 10-99
observations yield ``ndig=2``, etc.
tight (:obj:`bool`, optional):
Output the configuration in "tight" format, where unallocated
apertures are not included.
target_file (:obj:`str`, optional):
Name of the file with the original targets. If provided, will be
included in header of output configuration files.
ra_c (:obj:`int`, optional):
1-indexed column number with the RA coordinates in ``target_file``.
Ignored if ``target_file`` is None.
dec_c (:obj:`int`, optional):
1-indexed column number with the DEC coordinates in ``target_file``.
Ignored if ``target_file`` is None.
"""
# Check the coordinate arrays
if ra.ndim > 1:
raise ValueError('Input coordinate arrays must be 1D.')
if dec.shape != ra.shape:
raise ValueError('Shape of coordinate arrays differ.')
nobj = ra.size
objid_type = 'name'
if objid is None:
objid = numpy.arange(nobj).astype(str)
objid_type = 'index'
if objid.shape != ra.shape:
raise ValueError('Object ID array does not match shape of coordinate arrays.')
objidlen = numpy.amax([len(o) for o in objid])
_center = numpy.atleast_1d(center).ravel()
if _center.size != 2:
raise ValueError('Center coordinates must provide RA and DEC only.')
# Check the observation lists
nobs = len(obs_obj)
if len(obs_ap) != nobs:
raise ValueError(f'Incorrect number of aperture arrays; expected {nobs}, got '
f'{len(obs_ap)}.')
if len(obs_mode) != nobs:
raise ValueError(f'Incorrect number of instrument modes; expected {nobs}, got '
f'{len(obs_mode)}.')
for indx in obs_obj:
if numpy.any(indx >= nobj):
raise ValueError('Object selection indices out of bounds of coordinate arrays.')
# Set the file name number of digits
if ndig is None:
ndig = int(numpy.ceil(numpy.log10(nobs+1)))
# Construct and check the output root
_root = Path(root).resolve()
if _root.is_dir():
raise ValueError('Provided root is a directory. Must provide leading name of files.')
if path is None:
_path = _root.parent
_root = _root.name
else:
_path = Path(path).resolve()
if not _path.is_dir():
warnings.warn(f'Request path, {path}, currently does not exist and will be created.')
_root = _root.name
for i in range(nobs):
ofile = str(_path / f'{_root}_{i+1:0{ndig}}.db')
ap = FOBOSApertures(mode=obs_mode[i])
if numpy.any(obs_ap[i] >= ap.nap):
raise ValueError(f'Aperture selection indices for observation {i+1} are invalid.')
cfg = ConfigParser()
cfg['FOBOS_MODE'] = {'version': ap.version,
'mode': ','.join(ap.mode.astype(str)),
'baseline': ap.baseline,
'design': ap.config
}
cfg['TARGETS'] = {'source': 'Unknown' if target_file is None else target_file,
'ra_col': 'None' if ra_c is None else ra_c,
'dec_col': 'None' if dec_c is None else dec_c,
'id_type': objid_type,
'allocated': len(obs_obj[i])
}
cfg['POINTING'] = {'ra': f'{_center[0]:12.8f}',
'dec': f'{_center[1]:12.8f}'
}
with io.StringIO() as f:
cfg.write(f)
cfg_str = f.getvalue()
header_text = 'FOBOS configuration file\n\n' \
f'Generated: {time.strftime("%a %d %b %Y %H:%M:%S", time.localtime())}\n\n'
header_text += '-'*70 + '\n\n'
header_text += cfg_str
header_text += '-'*70 + '\n\n'
header_text += 'Columns are:\n' \
' MID - Module ID number\n' \
' SID - Spectrograph ID number\n' \
' BID - Starbug ID number\n' \
' Type - Starbug payload type: (0) single-fiber, (1) 37-fiber \n' \
' IFU, (3) imaging bundle, (4) flux-calibration bundle.\n' \
' OBJID - Object ID \n' \
' RA - Target right ascension (decimal degrees)\n' \
' Dec - Target declination (decimal degrees)\n\n' \
f'{"MID":>3} {"SID":>3} {"BID":>3} {"Type":>4} ' \
f'{"OBJID":>{objidlen}} {"RA":>12} {"DEC":>11}'
tab = empty_configuration_table(nrows=ap.nap, objidlen=objidlen)
# NOTE: The syntax "tab['MID'][:]" is needed here so that the table
# column doesn't loose it's other attributes (format, description, etc.)
tab['MID'][:] = ap.mid
tab['SID'][:] = ap.spc
tab['BID'][:] = ap.bid
tab['Type'][:] = ap.payload
tab['OBJID'][:] = 'None'
tab['OBJID'][obs_ap[i]] = objid[obs_obj[i]]
tab['RA'][obs_ap[i]] = ra[obs_obj[i]]
tab['DEC'][obs_ap[i]] = dec[obs_obj[i]]
if tight:
indx = tab['OBJID'] != 'None'
tab = tab[indx]
print(f'Writing: {ofile}')
with open(ofile, 'w') as f:
for l in header_text.split('\n'):
f.write(f'# {l}\n')
tab.write(f, format='ascii.fixed_width_no_header', delimiter=' ')
[docs]def empty_configuration_table(nrows=None, objidlen=20):
"""
Construct an empty FOBOS configuration table.
Args:
nrows (:obj:`int`, optional):
Number of table rows. If None, initialized to 0.
Returns:
`astropy.table.Table`_: Empty configuration table.
"""
length = 0 if nrows is None else nrows
_objidlen = max(5, objidlen)
return table.Table([table.Column(name='MID', dtype=int, length=length,
description='Module ID Number', format='>3'),
table.Column(name='SID', dtype=int, length=length,
description='Spectrograph ID Number', format='>3'),
table.Column(name='BID', dtype=int, length=length,
description='Starbug ID Number', format='>3'),
table.Column(name='Type', dtype=int, length=length,
description='Starbug payload type', format='>4'),
table.Column(name='OBJID', dtype=f'<U{_objidlen}', length=length,
description='Object identifier', format=f'>{_objidlen}'),
table.Column(name='RA', dtype=float, length=length,
description='Object right ascension', format='12.8f'),
table.Column(name='DEC', dtype=float, length=length,
description='Object declination', format='11.8f')
])
[docs]def parse_configuration(config_file, parse_source=False):
"""
Parse the FOBOS configuration from a configuration file.
Parameters
----------
config_file : :obj:`str`
Name of the configuration file.
parse_source : :obj:`bool`, optional
If possible given the data within the configuration, re-read and return
the full target list from the target source file, instead of only those
with allocated apertures.
Returns
-------
objid : `numpy.ndarray`_
1d array with the target ID numbers
ra : `numpy.ndarray`_
Target right ascension in decimal degrees
dec : `numpy.ndarray`_
Target declination in decimal degrees
center : :obj:`tuple`
Two-tuple with the coordinates of the pointing center
assigned_obj : `numpy.ndarray`_
Integer array with the array index in, e.g., ``objid`` with the target
that has been assigned a FOBOS aperture.
ap : :class:`~producer.deploy.FOBOSApertures`
Aperture deployment instance.
assigned_ap : `numpy.ndarray`_
Integer array with the indices of the apertures in ``ap`` that have been
assigned to a target.
"""
_cfg = Path(config_file).resolve()
if not _cfg.is_file():
raise FileNotFoundError(f'{str(_cfg)} does not exist!')
# Read the data table
db = numpy.genfromtxt(str(_cfg), dtype=str)
mid, sid, bid, payload = db.T[:4].astype(int)
objid = db[:,4]
ra, dec = db.T[5:].astype(float)
# All of this is to just parse the configuration...
with open(str(_cfg), 'r') as f:
lines = numpy.asarray(f.readlines())
lines = lines[numpy.array([l[0] == '#' for l in lines])]
lines = [l[2:].strip() for l in lines]
for i in range(len(lines)):
if len(lines[i]) > 0 and lines[i] == '-'*len(lines[i]):
i += 1
break
for j in range(len(lines)-1, -1, -1):
if len(lines[j]) > 0 and lines[j] == '-'*len(lines[j]):
break
# Read the configuration parameters
cfg = ConfigParser()
cfg.read_string('\n'.join(lines[i:j]))
# Instantiate the aperture deployment object
if cfg['FOBOS_MODE'].get('version') != FOBOSApertures.version:
raise ValueError('FOBOS aperture-deployment version has changed!')
mode = numpy.array(cfg['FOBOS_MODE'].get('mode').split(',')).astype(int)
baseline = cfg['FOBOS_MODE'].getboolean('baseline')
design = cfg['FOBOS_MODE'].getint('design')
ap = FOBOSApertures(mode=mode, baseline=baseline, config=design)
# Attempt to parse original target file?
target_file = None
if parse_source:
target_file = Path(cfg['TARGETS'].get('source')).resolve()
if not target_file.is_file():
warnings.warn(f'Original target source file does not exist: {str(target_file)}')
target_file = None
ra_c = cfg['TARGETS'].get('ra_col')
dec_c = cfg['TARGETS'].get('dec_col')
if 'None' in [ra_c, dec_c]:
warnings.warn('RA and/or DEC columns not defined. Not re-reading source catalog.')
target_file = None
else:
ra_c = int(ra_c)
dec_c = int(dec_c)
# Get the pointing center
center = (cfg['POINTING'].getfloat('ra'), cfg['POINTING'].getfloat('dec'))
# Find the assigned apertures indices in the
# TODO: There needs to be a better way to do this...
assigned_ap = numpy.array([numpy.where((m == ap.mid) & (b == ap.bid))[0][0]
for m, b in zip(mid, bid)])
if target_file is None:
# Only returning the allocated list, so we're done
return objid.astype(int), ra, dec, center, numpy.arange(assigned_ap.size), ap, assigned_ap
if cfg['TARGETS'].get('id_type') != 'index':
raise NotImplementedError('Can only handle object ID that are the index of the target '
'in the source file.')
# Re-read the target catalog
full_ra, full_dec, _ = parse_targets(target_file, ra_c=ra_c, dec_c=dec_c)
return numpy.arange(full_ra.size), full_ra, full_dec, center, objid.astype(int), \
ap, assigned_ap