Welcome to mpi_map’s documentation!

Contents:

mpi_map.mpi_map_code(f, x, params, procs, obj_dill=None)[source]

This function applies the function in func_code to the x inputs on procs processors.

Parameters:
  • f (function) – function
  • x (list or ndarray) – input
  • params (tuple) – parameters to be passed to the function (pickable)
  • procs (int) – number of processors to be used
  • obj (object) – object where f
Returns:

(list [nprocs]) – (ordered) outputs from all the processes

mpi_map.eval_method(fname, x, params, obj, nprocs=None, reduce_obj=None, reduce_args=None, import_set=set(), splitted=False)[source]

This function applies the method with name fname of object obj to the x inputs on nprocs processors.

Parameters:
  • fname (str) – name of the function defined in obj
  • x (list or ndarray) – input
  • params (tuple) – parameters to be passed to the function (pickable)
  • obj (object) – object where f is defined
  • nprocs (int) – number of processes. If None then MPI.COMM_WORLD.Get_size() processes will be started
  • reduce_obj (object) – object ReduceObject defining the reduce method to be applied (if any)
  • reduce_args (object) – arguments to be provided to reduce_object
  • import_set (set) – list of couples (module_name,as_field) to be imported as import module_name as as_field
  • splitted (bool) – whether the input is already splitted
Returns:

(list [nprocs]) – (ordered) outputs from all the processes

mpi_map.barrier(comm, tag=0, sleep=0.01)[source]

Function used to avoid busy-waiting.

As suggested by Lisandro Dalcin at: * http://code.google.com/p/mpi4py/issues/detail?id=4 and * https://groups.google.com/forum/?fromgroups=#!topic/mpi4py/nArVuMXyyZI

class mpi_map.MPI_Pool[source]

Returns (but not start) a pool of nprocs processes

Usage example:

import numpy as np
import numpy.random as npr
import mpi_map

class Operator(object):
    def dot(self, x, A):
        return np.dot(A,x.T).T

def set_A(A):
    return (None,A)

nprocs = 2
op = Operator()
A = npr.randn(5*5).reshape(5,5)

import_set = set([("numpy","np")])
pool = mpi_map.MPI_Pool()
pool.start(nprocs, import_set)
try:
    # Set params on nodes' memory
    params = {'A': A}
    pool.eval(set_A, obj_bcast=params,
              dmem_key_out_list=['A'])

    # Evaluate on firts input
    x = npr.randn(100,5)
    split = pool.split_data([x],['x'])
    xdot_list = pool.eval("dot", obj_scatter=split,
                          dmem_key_in_list=['A'], dmem_arg_in_list=['A'],
                          obj=op)
    xdot = np.concatenate(xdot_list, axis=0)

    # Evaluate on second input
    y = npr.randn(100,5)
    split = pool.split_data([y],['x'])
    ydot_list = pool.eval("dot", obj_scatter=split,
                          dmem_key_in_list=['A'], dmem_arg_in_list=['A'],
                          obj=op)
    ydot = np.concatenate(ydot_list, axis=0)
finally:
    pool.stop()
eval(f, obj_scatter=None, obj_bcast=None, dmem_key_in_list=None, dmem_arg_in_list=None, dmem_key_out_list=None, obj=None, reduce_obj=None, reduce_args=None, import_set=None)[source]

Submit a job to the pool.

Execute function f with scattered input obj_scatter and broadcasted input obj_bcast. f should have the following format:

obj_gather = f(**obj_scatter, **obj_bcast, **distr_mem[dmem_key_in_list])

or

(obj_gather, **distr_mem[dmem_key_out_list]) = f(**obj_scatter, **obj_bcast, **distr_mem[dmem_key_in_list])

where disrt_mem is a data structure containing data that is stored on the nodes, scatter_obj is a dictionary with the input data that is scattered to the nodes, obj_bcast is a dictionary with the input data that is broadcasted to the nodes, obj_gather is the data structure that is gathered from the nodes.

If obj is not None, f must be a string identifying the method obj.f.

Parameters:
  • f (object or str) – function or string identifying the function in object obj
  • obj_scatter (list of dict) – input already splitted by MPI_pool.split_data()
  • obj_bcast (dict) – dictionary with keys the input names of f and values the values taken by the keys.
  • dmem_key_in_list (list) – list of string containing the keys to be fetched (or created with default None if missing) from the distributed memory and provided as input to f.
  • dmem_arg_in_list (list) – list of string containing the argument keywords to map fetched elements from the distributed memory to actual arguments of f. Must be the same length of dmem_key_in_list.
  • dmem_key_out_list (list) – list of keys to be assigned to the outputs beside the first one
  • obj (object) – object where to find function f.
  • reduce_obj (object) – object ReduceObject defining the reduce method to be applied (if any)
  • reduce_args (object) – arguments to be provided to reduce_object, already splitted using MPI_pool.split_data().
  • import_set (set) – list of couples (module_name,as_field) to be imported as import module_name as as_field
Returns:

(list [nprocs]) – (ordered) outputs from all the processes

split_data(x_list, kw_list)[source]

Split the list of arguments in x_list into nprocs chunks and identify them by the keywords in kw_list.

Parameters:
  • x_list (list) – list of m arguments splittable in nprocs chunks
  • kw_list (list) – list of m strings used to identify the arguments
Returns:

(list [nprocs]) – list of dictionaries containing the chucks

start(nprocs=None, import_set=set())[source]

Start the pool of processes

Parameters:
  • nprocs (int) – number of processes. If None then MPI.COMM_WORLD.Get_size() processes will be started
  • import_set (set) – list of couples (module_name,as_field) to be imported as import module_name as as_field
stop()[source]

Stop the pool of processes

class mpi_map.ReduceObject[source]

[Abstract] Generic object to be passed to MPI methods in order to define reduce operations.

inner_reduce(x, *args, **kwargs)[source]

[Abstract] Reduce function called interally by every process

outer_reduce(x, *args, **kwargs)[source]

[Abstract] Reduce function called by the root process

Indices and tables