Source code for pymatgen.optimization.linear_assignment_numpy

# coding: utf-8
# Copyright (c) Pymatgen Development Team.
# Distributed under the terms of the MIT License.


"""
This module contains an algorithm to solve the Linear Assignment Problem.
It has the same functionality as linear_assignment.pyx, but is much slower
as it is vectorized in numpy rather than cython
"""

__author__ = "Will Richards"
__copyright__ = "Copyright 2011, The Materials Project"
__version__ = "1.0"
__maintainer__ = "Will Richards"
__email__ = "wrichards@mit.edu"
__date__ = "Jan 28, 2013"

import numpy as np


[docs]class LinearAssignment: """ This class finds the solution to the Linear Assignment Problem. It finds a minimum cost matching between two sets, given a cost matrix. This class is an implementation of the LAPJV algorithm described in: R. Jonker, A. Volgenant. A Shortest Augmenting Path Algorithm for Dense and Sparse Linear Assignment Problems. Computing 38, 325-340 (1987) .. attribute: min_cost: The minimum cost of the matching .. attribute: solution: The matching of the rows to columns. i.e solution = [1, 2, 0] would match row 0 to column 1, row 1 to column 2 and row 2 to column 0. Total cost would be c[0, 1] + c[1, 2] + c[2, 0] """ def __init__(self, costs, epsilon=1e-6): """ Args: costs: The cost matrix of the problem. cost[i,j] should be the cost of matching x[i] to y[j]. The cost matrix may be rectangular epsilon: Tolerance for determining if solution vector is < 0 """ self.orig_c = np.array(costs, dtype=np.float64) self.nx, self.ny = self.orig_c.shape self.n = self.ny self._inds = np.arange(self.n) self.epsilon = abs(epsilon) # check that cost matrix is square if self.nx > self.ny: raise ValueError("cost matrix must have at least as many columns as rows") if self.nx == self.ny: self.c = self.orig_c else: # Can run into precision issues if np.max is used as the fill value (since a # value of this size doesn't necessarily end up in the solution). A value # at least as large as the maximin is, however, guaranteed to appear so it # is a safer choice. The fill value is not zero to avoid choosing the extra # rows in the initial column reduction step self.c = np.full((self.n, self.n), np.max(np.min(self.orig_c, axis=1))) self.c[:self.nx] = self.orig_c # initialize solution vectors self._x = np.zeros(self.n, dtype=np.int) - 1 self._y = self._x.copy() # if column reduction doesn't find a solution, augment with shortest # paths until one is found if self._column_reduction(): self._augmenting_row_reduction() # initialize the reduced costs self._update_cred() while -1 in self._x: self._augment() self.solution = self._x[:self.nx] self._min_cost = None @property def min_cost(self): """ Returns the cost of the best assignment """ if self._min_cost: return self._min_cost self._min_cost = np.sum(self.c[np.arange(self.nx), self.solution]) return self._min_cost def _column_reduction(self): """ Column reduction and reduction transfer steps from LAPJV algorithm """ # assign each column to its lowest cost row, ensuring that only row # or column is assigned once i1, j = np.unique(np.argmin(self.c, axis=0), return_index=True) self._x[i1] = j # if problem is solved, return if len(i1) == self.n: return False self._y[j] = i1 # reduction_transfer # tempc is array with previously assigned matchings masked self._v = np.min(self.c, axis=0) tempc = self.c.copy() tempc[i1, j] = np.inf mu = np.min(tempc[i1, :] - self._v[None, :], axis=1) self._v[j] -= mu return True def _augmenting_row_reduction(self): """ Augmenting row reduction step from LAPJV algorithm """ unassigned = np.where(self._x == -1)[0] for i in unassigned: for _ in range(self.c.size): # Time in this loop can be proportional to 1/epsilon # This step is not strictly necessary, so cutoff early # to avoid near-infinite loops # find smallest 2 values and indices temp = self.c[i] - self._v j1 = np.argmin(temp) u1 = temp[j1] temp[j1] = np.inf j2 = np.argmin(temp) u2 = temp[j2] if u1 < u2: self._v[j1] -= u2 - u1 elif self._y[j1] != -1: j1 = j2 k = self._y[j1] if k != -1: self._x[k] = -1 self._x[i] = j1 self._y[j1] = i i = k if k == -1 or abs(u1 - u2) < self.epsilon: break def _update_cred(self): """ Updates the reduced costs with the values from the dual solution """ ui = self.c[self._inds, self._x] - self._v[self._x] self.cred = self.c - ui[:, None] - self._v[None, :] def _augment(self): """ Finds a minimum cost path and adds it to the matching """ # build a minimum cost tree _pred, _ready, istar, j, mu = self._build_tree() # update prices self._v[_ready] += self._d[_ready] - mu # augment the solution with the minimum cost path from the # tree. Follows an alternating path along matched, unmatched # edges from X to Y while True: i = _pred[j] self._y[j] = i k = j j = self._x[i] self._x[i] = k if i == istar: break self._update_cred() def _build_tree(self): """ Builds the tree finding an augmenting path. Alternates along matched and unmatched edges between X and Y. The paths are stored in _pred (new predecessor of nodes in Y), and self._x and self._y """ # find unassigned i* istar = np.argmin(self._x) # compute distances self._d = self.c[istar] - self._v _pred = np.zeros(self.n, dtype=np.int) + istar # initialize sets # READY: set of nodes visited and in the path (whose price gets # updated in augment) # SCAN: set of nodes at the bottom of the tree, which we need to # look at # T0DO: unvisited nodes _ready = np.zeros(self.n, dtype=np.bool) _scan = np.zeros(self.n, dtype=np.bool) _todo = np.zeros(self.n, dtype=np.bool) + True while True: # populate scan with minimum reduced distances if True not in _scan: mu = np.min(self._d[_todo]) _scan[self._d == mu] = True _todo[_scan] = False j = np.argmin(self._y * _scan) if self._y[j] == -1 and _scan[j]: return _pred, _ready, istar, j, mu # pick jstar from scan (scan always has at least 1) _jstar = np.argmax(_scan) # pick i associated with jstar i = self._y[_jstar] _scan[_jstar] = False _ready[_jstar] = True # find shorter distances newdists = mu + self.cred[i, :] shorter = np.logical_and(newdists < self._d, _todo) # update distances self._d[shorter] = newdists[shorter] # update predecessors _pred[shorter] = i for j in np.nonzero(np.logical_and(self._d == mu, _todo))[0]: if self._y[j] == -1: return _pred, _ready, istar, j, mu _scan[j] = True _todo[j] = False