Source code for nanshe.box.spams_sandbox

"""
The ``spams_sandbox`` module provides mechanisms for sandboxing SPAMS.

===============================================================================
Overview
===============================================================================
SPAMS sometimes seems to step on the interpreter. As a result, we provide a
number of strategies to address this, by launching it in a separate process so
that it hopefully does not mess up the main interpreter. We also try to keep
the module space clean. This seems to help.

===============================================================================
API
===============================================================================
"""


__author__ = "John Kirkham <kirkhamj@janelia.hhmi.org>"
__date__ = "$Jun 20, 2014 12:07:48 EDT$"


import npctypes
import npctypes.shared


[docs]class SPAMSException(Exception): pass
[docs]def run_multiprocessing_queue_spams_trainDL(out_queue, *args, **kwargs): """ Designed to run spams.trainDL in a separate process. It is necessary to run SPAMS in a separate process as segmentation faults have been discovered in later parts of the Python code dependent on whether SPAMS has run or not. It is suspected that spams may interfere with the interpreter. Thus, it should be sandboxed (run in a different Python interpreter) so that it doesn't damage what happens in this one. This particular version uses a multiprocessing.Queue to return the resulting dictionary. Args: out_queue(multiprocessing.Queue): what will take the returned dictionary from spams.trainDL. *args(list): a list of position arguments to pass to spams.trainDL. **kwargs(dict): a dictionary of keyword arguments to pass to spams.trainDL. """ # It is not needed outside of calling spams.trainDL. # Also, it takes a long time to load this module. import spams result = spams.trainDL(*args, **kwargs) out_queue.put(result)
[docs]def call_multiprocessing_queue_spams_trainDL(*args, **kwargs): """ Designed to start spams.trainDL in a separate process and handle the result in an unnoticeably different way. It is necessary to run SPAMS in a separate process as segmentation faults have been discovered in later parts of the Python code dependent on whether SPAMS has run or not. It is suspected that spams may interfere with the interpreter. Thus, it should be sandboxed (run in a different Python interpreter) so that it doesn't damage what happens in this one. This particular version uses a multiprocessing.Queue to return the resulting dictionary. Args: *args(list): a list of position arguments to pass to spams.trainDL. **kwargs(dict): a dictionary of keyword arguments to pass to spams.trainDL. Returns: result(numpy.matrix): the dictionary found """ # Only necessary for dealing with SPAMS import multiprocessing out_queue = multiprocessing.Queue() queue_args = (out_queue,) + args p = multiprocessing.Process( target=run_multiprocessing_queue_spams_trainDL, args=queue_args, kwargs=kwargs ) p.start() result = out_queue.get() result = result.copy() p.join() if p.exitcode != 0: raise SPAMSException( "SPAMS has terminated with exitcode \"" + repr(p.exitcode) + "\"." ) return(result)
[docs]def run_multiprocessing_array_spams_trainDL(result_array_type, result_array, X_array_type, X_array, D_is_arg=False, D_array_type=None, D_array=None, *args, **kwargs): """ Designed to start spams.trainDL in a separate process and handle the result in an unnoticeably different way. It is necessary to run SPAMS in a separate process as segmentation faults have been discovered in later parts of the Python code dependent on whether SPAMS has run or not. It is suspected that spams may interfere with the interpreter. Thus, it should be sandboxed (run in a different Python interpreter) so that it doesn't damage what happens in this one. This particular version uses a multiprocessing.Array to share memory to return the resulting dictionary. Args: result_array_type(numpy.ctypeslib.ndpointer): Unused will drop. A pointer type with properties needed by result_array. result_array(multiprocessing.RawArray): shared memory array to store results in. X_array_type(numpy.ctypeslib.ndpointer): Unused will drop. a pointer type with properties needed by X_array. X_array(numpy.ndarray): currently uses numpy ndarray as input. D_is_arg(bool): Whether D either is an arg and/or should be an arg. D_array_type(numpy.ctypeslib.ndpointer): Unused will drop. a pointer type with properties needed by D_array. D_array(numpy.ndarray): currently uses numpy ndarray as the initial dictionary. *args(list): a list of position arguments to pass to spams.trainDL. **kwargs(dict): a dictionary of keyword arguments to pass to spams.trainDL. Note: This is somewhat faster than using multiprocessing.Queue. """ # Just to make sure this exists in the new process. Shouldn't be necessary. import numpy # Just to make sure this exists in the new process. Shouldn't be necessary. # Also, it is not needed outside of calling this function. import spams with npctypes.shared.as_ndarray(X_array) as X: with npctypes.shared.as_ndarray(result_array) as result: if D_array is not None: with npctypes.shared.as_ndarray(D_array) as D: if D_is_arg: args[3] = D else: kwargs["D"] = D result[:] = spams.trainDL(X, *args, **kwargs) else: result[:] = spams.trainDL(X, *args, **kwargs)
[docs]def call_multiprocessing_array_spams_trainDL(X, *args, **kwargs): """ Designed to start spams.trainDL in a separate process and handle result in an unnoticeably different way. It is necessary to run SPAMS in a separate process as segmentation faults have been discovered in later parts of the Python code dependent on whether SPAMS has run or not. It is suspected that spams may interfere with the interpreter. Thus, it should be sandboxed (run in a different Python interpreter) so that it doesn't damage what happens in this one. This particular version uses a multiprocessing.Array to share memory to return the resulting dictionary. Args: X(numpy.matrix): a Fortran order NumPy Matrix with the same name as used by spams.trainDL (so if someone tries to use it as a keyword argument...). *args(list): a list of position arguments to pass to spams.trainDL. **kwargs(dict): a dictionary of keyword arguments to pass to spams.trainDL. Note: This is somewhat faster than using multiprocessing.Queue. """ # Only necessary for dealing with SPAMS import multiprocessing # Just to make sure this exists in the new process. Shouldn't be necessary. import numpy D_is_arg = False D = None if (len(args) >= 4): D_is_arg = True D = args[3] args[3] = None else: D = kwargs.pop("D", None) # Create a shared array to contain X X_array = npctypes.shared.ndarray(X.shape, X.dtype, "F") # Copy over the contents of X. with npctypes.shared.as_ndarray(X_array) as X_array_numpy: X_array_numpy[...] = X del X_array_numpy len_D = kwargs.get("K", None) if D is not None: # Create a shared array to contain D D_array = npctypes.shared.ndarray(D.shape, D.dtype, "F") # Copy over the contents of D. with npctypes.shared.as_ndarray(D_array) as D_array_numpy: D_array_numpy[...] = D del D_array_numpy len_D = D.shape[-1] # Create a shared array to contain the result result_array = npctypes.shared.ndarray((X.shape[0], len_D), X.dtype, "F") new_args = ( type(result_array), result_array, type(X_array), X_array, ) if D is not None: new_args = new_args + ( D_is_arg, type(D_array), D_array, ) p = multiprocessing.Process( target=run_multiprocessing_array_spams_trainDL, args=new_args, kwargs=kwargs ) p.start() p.join() if p.exitcode != 0: raise SPAMSException( "SPAMS has terminated with exitcode \"" + repr(p.exitcode) + "\"." ) # Reconstruct the result from the output array result = None with npctypes.shared.as_ndarray(result_array) as result: result = result.copy() return(result)
[docs]def call_spams_trainDL(*args, **kwargs): """ Encapsulates call to spams.trainDL. Ensures copy of results occur just in case. Designed to be like the multiprocessing calls. Args: *args(list): a list of position arguments to pass to spams.trainDL. **kwargs(dict): a dictionary of keyword arguments to pass to spams.trainDL. Note: For legacy. """ # It is not needed outside of calling spams.trainDL. # Also, it takes a long time to load this module. import spams result = spams.trainDL(*args, **kwargs) result = result.copy() return(result)