"""
Allocate apertures to targets.
Contains code originally written by 2021 Akamai intern, Brittany Ann
Ramos.
.. include:: ../include/links.rst
"""
from IPython import embed
import numpy
from scipy.optimize import linear_sum_assignment
from matplotlib import pyplot, patches
from sklearn.neighbors import KDTree
from .collisions import remove_collisions
[docs]def assign_apertures_plot(objx, objy, apx, apy, assigned_obj, assigned_ap, collision=1/6.,
ignore_obj=None, ignore_ap=None, zoom=0.75):
"""
Show a plot of the aperture assignments. I.e.:
.. code-block:: python
assigned_obj, assigned_ap = assign_apertures(objx, objy, apx, apy)
assign_apertures_plot(objx, objy, apx, apy, assigned_obj, assigned_ap)
Args:
objx (`numpy.ndarray`_):
Cartesian x coordinate in tangent plane projection of object
coordinates relative to the pointing center. Shape must be
1D and match ``objy``.
objy (`numpy.ndarray`_):
Cartesian y coordinate in tangent plane projection of object
coordinates relative to the pointing center. Shape must be
1D and match ``objx``.
apx (`numpy.ndarray`_):
Cartesian x coordinate in tangent plane projection of the
"home" aperture coordinates relative to the pointing center.
Shape must be 1D and match ``apy``.
apy (`numpy.ndarray`_):
Cartesian y coordinate in tangent plane projection of the
"home" aperture coordinates relative to the pointing center.
Shape must be 1D and match ``apx``.
assigned_obj (`numpy.ndarray`_):
Indices of the objects with assigned apertures. This is the first
object returned by :func:`~producer.allocate.assign_apertures`.
assigned_ap (`numpy.ndarray`_):
Indices of the apertures assigned to objects. This is the second
object returned by :func:`~producer.allocate.assign_apertures`.
collision (:obj:`float`, optional):
Aperture-to-aperture collision radius.
ignore_obj (`numpy.ndarray`_, optional):
An integer array with the indices of objects that were *ignored*
during assignment. If None, all objects were included in the
assignment attempt.
ignore_ap (`numpy.ndarray`_, optional):
An integer array with the indices of apertures that were *ignored*
during assignment. If None, all apertures were included in the
assignment attempt.
zoom (:obj:`float`, optional):
Factor to zoom in default limits of the plot relative to the nominal
set by the extent of the allocated targets. For example, ``zoom=2``
means to *decrease* the plot range by a factor of two; to zoom out,
use a zoom value that is less than 1.
"""
# Get the required width in each direction
Dx = (numpy.amax(objx[assigned_obj]) - numpy.amin(objx[assigned_obj]))/zoom
Dy = (numpy.amax(objy[assigned_obj]) - numpy.amin(objy[assigned_obj]))/zoom
# Force it to be square
D = max(Dx, Dy)
xlim = numpy.mean(objx[assigned_obj]) + numpy.array([-D/2, D/2])
ylim = numpy.mean(objy[assigned_obj]) + numpy.array([-D/2, D/2])
w,h = pyplot.figaspect(1)
fig = pyplot.figure(figsize=(1.5*w,1.5*h))
ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
ax.set_xlim(xlim)
ax.set_ylim(xlim)
ax.minorticks_on()
ax.grid(True, which='major', color='0.9', zorder=0, linestyle='-')
ax.tick_params(which='major', direction='in', length=8, top=True, right=True)
ax.tick_params(which='minor', direction='in', length=4, top=True, right=True)
for i, (_x, _y) in enumerate(zip(objx[assigned_obj], objy[assigned_obj])):
kwargs = {} # if i > 0 else {'label': 'Assigned Aper.'}
ax.add_patch(patches.Circle((_x,_y), radius=collision/2., facecolor='C0',
edgecolor='none', zorder=0, alpha=0.2, **kwargs))
if len(assigned_ap) != apx.size:
unassigned = numpy.setdiff1d(numpy.arange(apx.size), assigned_ap)
if ignore_ap is not None:
unassigned = numpy.setdiff1d(unassigned, ignore_ap)
for i, (_x, _y) in enumerate(zip(apx[unassigned], apy[unassigned])):
kwargs = {} # if i > 0 else {'label': 'Unassigned Aper.'}
ax.add_patch(patches.Circle((_x,_y), radius=collision/2., facecolor='C1',
edgecolor='none', zorder=0, alpha=0.2, **kwargs))
ax.scatter(objx[assigned_obj], objy[assigned_obj], marker='.', color='C0', s=30, lw=0,
label='Alloc. Target')
if len(assigned_obj) != objx.size:
unassigned = numpy.setdiff1d(numpy.arange(objx.size), assigned_obj)
if ignore_obj is not None:
unassigned = numpy.setdiff1d(unassigned, ignore_obj)
ax.scatter(objx[unassigned], objy[unassigned], marker='.', color='C1', s=30, lw=0,
label='Unalloc. Target')
# FOBOS field-of-view
ax.add_patch(patches.Circle((0.,0.), radius=10., facecolor='none', edgecolor='C3',
zorder=4))
ax.set_xlabel(r'$\xi$ [arcmin]')
ax.set_ylabel(r'$\eta$ [arcmin]')
# TODO: The legend really slows down the plot...
# ax.legend()
pyplot.show()
[docs]def assign_apertures(objx, objy, apx, apy, collision=1/6., patrol=2.3, ignore_obj=None,
ignore_ap=None, allocated_obj=None, allocated_ap=None):
"""
Assign free apertures to objects.
Coordinate units must be the same for all arguments. I.e., if the
object and aperture coordinates are in arcmin, the collision and
patrol radii must be also.
To avoid collisions with previously allocated objects, use
``allocated_obj``. Providing ``allocated_ap`` just removes these
apertures from consideration; i.e., the information of where the
apertures are located in the field is associated with
``allocated_obj``, not ``allocated_ap``. Providing one does not
mean you have to provide the other.
Objects and apertures can also be ignored using ``ignore_obj`` and
``ignore_fib``. Ignored objects/apertures are not considered during
assignment. These arguments are largely provided as book-keeping
convenience; i.e., the returned indices are based on the full input
arrays instead of a preselected subset passed to this function.
Args:
objx (`numpy.ndarray`_):
Cartesian x coordinate in tangent plane projection of object
coordinates relative to the pointing center. Shape must be
1D and match ``objy``.
objy (`numpy.ndarray`_):
Cartesian y coordinate in tangent plane projection of object
coordinates relative to the pointing center. Shape must be
1D and match ``objx``.
apx (`numpy.ndarray`_):
Cartesian x coordinate in tangent plane projection of the
"home" aperture coordinates relative to the pointing center.
Shape must be 1D and match ``apy``.
apy (`numpy.ndarray`_):
Cartesian y coordinate in tangent plane projection of the
"home" aperture coordinates relative to the pointing center.
Shape must be 1D and match ``apx``.
collision (:obj:`float`, optional):
Aperture-to-aperture collision radius.
patrol (:obj:`float`, optional):
Aperture patrol radius.
ignore_obj (`numpy.ndarray`_, optional):
An integer array with the indices of objects to *ignore*
during assignment. If None, all objects are included in
assignment attempt.
ignore_ap (`numpy.ndarray`_, optional):
An integer array with the indices of apertures to *ignore*
during assignment. If None, all apertures are included in
assignment attempt.
allocated_obj (`numpy.ndarray`_, optional):
An integer array with the indices of objects that have
already been allocated to targets in the field, as needed
to avoid collisions.
allocated_ap (`numpy.ndarray`_, optional):
An integer array with the indices of apertures that have
already been allocated to targets in the field. This is
only used in book-keeping; i.e., allocated apertures cannot
be reassigned and the returned assignment indices will match
the indices of the provided ``apx`` and ``apy`` vectors.
Returns:
:obj:`tuple`: Two `numpy.ndarray`_ objects with the indices of
the objects that have been assigned apertures, and the indices
of the associated apertures.
"""
# Check input
if objx.ndim > 1:
raise ValueError('Input object coordinates must be in 1D arrays.')
if objx.shape != objy.shape:
raise ValueError('Shape of input object coordinate arrays do not match.')
if apx.ndim > 1:
raise ValueError('Input aperture coordinates must be in 1D arrays.')
if apy.shape != apy.shape:
raise ValueError('Shape of input aperture coordinate arrays do not match.')
# Number of objects and apertures
nobj = objx.size
nap = apx.size
# Set the objects available for assignment and those already
# allocated.
_ignore = numpy.empty(0, dtype=int) if ignore_obj is None else ignore_obj.copy()
if allocated_obj is None:
prealloc_x = None
prealloc_y = None
else:
_ignore = numpy.unique(numpy.append(_ignore, allocated_obj))
prealloc_x = objx[allocated_obj]
prealloc_y = objy[allocated_obj]
_objx = objx.copy() if len(_ignore) == 0 else numpy.delete(objx, _ignore)
_objy = objy.copy() if len(_ignore) == 0 else numpy.delete(objy, _ignore)
obj_id = numpy.arange(nobj) if len(_ignore) == 0 else numpy.delete(numpy.arange(nobj), _ignore)
# Set the apertures available for assignment
_ignore = numpy.empty(0, dtype=int) if ignore_ap is None else ignore_ap.copy()
if allocated_ap is not None:
_ignore = numpy.unique(numpy.append(_ignore, allocated_ap))
_apx = apx.copy() if len(_ignore) == 0 else numpy.delete(apx, _ignore)
_apy = apy.copy() if len(_ignore) == 0 else numpy.delete(apy, _ignore)
ap_id = numpy.arange(nap) if len(_ignore) == 0 else numpy.delete(numpy.arange(nap), _ignore)
# Track the number of iterations required to avoid collisions
niter = 0
# Arrays to keep track of assigned apertures and objects
assigned_ap = numpy.empty(0, dtype=int)
assigned_obj = numpy.empty(0, dtype=int)
# Iteratively assign apertures to objects until either no objects
# are left or no apertures can be assigned
while _objx.size > 0 and _apx.size > 0:
# Build a KDTree and query the nearest nquery number of objects
# for each aperture
kdtree = KDTree(numpy.column_stack((_objx, _objy)))
nquery = min(20, _objx.size)
d, indx = kdtree.query(numpy.column_stack((_apx, _apy)), k=nquery)
# Setup the distance array for these objects and assign each
# aperture to a unique object. Distances for object-aperture
# pairs not returned by the KD-Tree query are set to a large
# number, effectively forcing them to be ignored by the
# linear-sum assignment function.
api = numpy.tile(numpy.arange(_apx.size), (nquery,1)).T
dist = numpy.full((_apx.size, _objx.size), 10*numpy.amax(d), dtype=float)
dist[api.flat,indx.flat] = d.flat
_a_ap, _a_obj = linear_sum_assignment(dist)
# Select the assignments that are within the patrol radius of
# the aperture and ignore the rest
indx = dist[_a_ap,_a_obj] < patrol
_a_ap = _a_ap[indx]
_a_obj = _a_obj[indx]
if _a_ap.size == 0:
# No apertures can be assigned within the patrol radius, so
# we're done!
break
# Thin the assigned coordinates of collided objects. This needs
# to account for all previously assigned apertures.
if len(assigned_ap) == 0 and prealloc_x is None:
not_collided = remove_collisions(_objx[_a_obj], _objy[_a_obj], collision)[1]
else:
if len(assigned_ap) == 0:
_pre_x = prealloc_x
_pre_y = prealloc_y
elif prealloc_x is None:
_pre_x = objx[assigned_obj]
_pre_y = objy[assigned_obj]
else:
_pre_x = numpy.append(prealloc_x, objx[assigned_obj])
_pre_y = numpy.append(prealloc_y, objy[assigned_obj])
npre = _pre_x.size
not_collided = remove_collisions(numpy.append(_pre_x, _objx[_a_obj]),
numpy.append(_pre_y, _objy[_a_obj]),
collision)[1][npre:]
# Add uncollided assignments to list of assigned apertures and
# objects
assigned_ap = numpy.append(assigned_ap, ap_id[_a_ap[not_collided]])
assigned_obj = numpy.append(assigned_obj, obj_id[_a_obj[not_collided]])
# Remove the successfully assigned apertures from those available
# to be reassigned
_apx = numpy.delete(_apx, _a_ap[not_collided])
_apy = numpy.delete(_apy, _a_ap[not_collided])
ap_id = numpy.delete(ap_id, _a_ap[not_collided])
# Remove the objects selected in the assignment, regardless of
# whether or not they collide. I.e., objects that collide with
# currently assigned apertures will continue to collide!
_objx = numpy.delete(_objx, _a_obj)
_objy = numpy.delete(_objy, _a_obj)
obj_id = numpy.delete(obj_id, _a_obj)
niter += 1
# TODO: Print a report?
return assigned_obj, assigned_ap