Implicit Parallelism: Parallelizing Compilers
Explicit Parallelism: Data Parallel | Task parallel | Message Passing (MPI)
SIMD
Task Parallel - different instructions on different data MIMD
SPMD
(single program, multiple data) not synchronized at individual operation level
Consist of separate processes, each with its own address space
Data is sent explicitly between processes
Multi-process communication (Collective operations)
A process is
Processes may have multiple threads (program counters and associated stacks) sharing a single address space.
MPI is for communication among processes, with separate address spaces
1. Introduction to MPI - Library and implementation characteristics - MPI for python - Error handling 2. Blocking point-to-point communication - Sending and receiving with MPI_Send and MPI_Recv - Dynamic receiving with MPI_Probe and MPI_Status - Groups and communicators 3. Non-blocking communication 4. Collective communication - Synchronization (Barrier) - Data movement (Broadcast, Gather, Scatter, Allgather, Alltoall) - Collective computation (Reduce, AllReduce, Scan, Exscan, Reduce_scatter) - Define your own collective routine - Non-Blocking Collective Operations 5. Topology mapping and neighborhood collectives 6. Profiling MPI applications
MPICH / OpenMPI
pickle
under the hoodLaunching MPI jobs
mpiexec –np ${RANKS} python ${PATH_TO_SCRIPT}
from mpi4py import MPI
print("Hello world")
mpiexec -n 4 python script.py
Hello world
Hello world
Hello world
Hello world
MPI_Initialize
/MPI_Finalize
MPI_Comm_size
reports the number of processesMPI_Comm_rank
reports the rank, a number between 0 and size-1, identifying the calling process#include “mpi.h”
int main(int argc, char *argv[])
{
int rank, size, i, provided;
float A(10);
MPI_Init_thread(&argc, &argv, MPI_THREAD_SINGLE, &provided);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
for (i=0; i<10; i++)
A[i] = i * rank;
printf("My rank %d of %d\n", rank, size );
printf("Here are my values for A\n");
for (i=0; i<10; i++) printf("%f ", A[i]);
printf("\n");
MPI_Finalize();
}
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
A = [i * rank for i in range(5)]
print("My rank %d of %d" %(rank, size));
print("My values for A: %s" %(A));
My rank 0 of 4
My rank 1 of 4
My rank 2 of 4
My values for A: [0, 0, 0, 0, 0]
My values for A: [0, 2, 4, 6, 8]
My values for A: [0, 1, 2, 3, 4]
My rank 3 of 4
My values for A: [0, 3, 6, 9, 12]
All MPI programs begin with MPI_Init_thread and end with MPI_Finalize
MPI_COMM_WORLD
is defined by mpi.h (in C) or the MPI module
Programmer can request the level of thread safety required for the program
MPI_THREAD_SINGLE
gives the same behavior as MPI_Init
MPI_THREAD_FUNNELED
The process may be multi-threadedMPI_THREAD_SERIALIZED
The process may be multi-threaded, and multiple threads may make MPI calls, but only one at a timeMPI_THREAD_MULTIPLE
Multiple threads may call MPI, with no restrictions.MPI_Init_thread is used since MPI-2
At import time (import mpi4py
)
MPI_Init_thread()
MPI_Finalize()
just before the Python process terminatesYou can set runtime configuration options (e.g. automatic initialization, thread_level) via mpi4py.rc()
MPI_Init()
Is_initialized()
to test for initialization Is_finalized()
)By default, an error causes all processes to abort.
MPI has error codes and classes
MPI_SUCCESS
, i.e., no errorMPI_Error_string
. An MPI implementation can use error codes to return instance-specific information on the error
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
error_code = MPI_Send(send_buffer, strlen(send_buffer) + 1,
MPI_CHAR, addressee, tag, MPI_COMM_WORLD);
if (error_code != MPI_SUCCESS){
char error_string[BUFSIZ];
int length_of_error_string;
MPI_Error_string(error_code, error_string, &length_of_error_string);
fprintf(stderr, "%3d: %s\n", my_rank, error_string);
send_error = TRUE;
}
mpi4py overrides the default MPI.ERRORS_ARE_FATAL
error handler in favor of MPI.ERRORS_RETURN
MPI errors as Python exceptions, can be dealt with the common try…except…finally
clauses
from mpi4py import MPI
assert MPI.COMM_WORLD.Get_size() > 1
rank = MPI.COMM_WORLD.Get_rank()
if rank == 0:
1/0
MPI.COMM_WORLD.send(None, dest=1, tag=42)
elif rank == 1:
MPI.COMM_WORLD.recv(source=0, tag=42)
OR
MPI_Abort()
on the MPI_COMM_WORLD
communicator
mpiexec -n ${NUM_PROCS} python -m mpi4py ${PYTHON_FILE} [${ARG}]
A group and context together form a communicator (*)
(*) the context (or ID) that differentiates one communicator from another and the group of processes contained by the communicator
MPI_COMM_WORLD
is enough for most applications
MPI_Comm_split(
MPI_Comm comm,
int color, // for split
int key, // for ordering
MPI_Comm* newcomm)
# Get the rank and size in the original communicator
from mpi4py import MPI
world_rank = MPI.COMM_WORLD.Get_rank()
world_size = MPI.COMM_WORLD.Get_size()
# Determine color based on even/odd ranks
color = world_rank % 2;
# Split the communicator based on the color and use the
# original rank for ordering
new_comm = MPI.COMM_WORLD.Split(color, world_rank)
new_rank = new_comm.Get_rank()
new_size = new_comm.Get_size()
print("World rank/size: %d/%d -- New rank/size: %d/%d" %(
world_rank, world_size, new_rank, new_size));
new_comm.Free()
World rank/size: 0/4 -- New rank/size: 0/2
World rank/size: 1/4 -- New rank/size: 0/2
World rank/size: 2/4 -- New rank/size: 1/2
World rank/size: 3/4 -- New rank/size: 1/2
(start, count, datatype)
¶Datatype can be:
There are MPI functions to construct custom datatypes
mpi4py uses protocols for packing and unpacking a Python object structure
int MPI_Type_create_struct(
int count,
int array_of_blocklengths[],
MPI_Aint array_of_displacements[],
MPI_Datatype array_of_types[],
MPI_Datatype *newtype
);
def definetype():
dtypes = [MPI.INT, MPI.DOUBLE]
dt_size = MPI.INT.size + MPI.DOUBLE.size
displ = [0, MPI.INT.size]
new_dtype = MPI.Datatype.Create_struct([1, 1], displ, dtypes)
new_dtype = new_dtype.Create_resized(0, dt_size)
new_dtype.Commit()
return new_dtype
Messages are sent with an accompanying user-defined integer tag
MPI_ANY_TAG
as the tag in a receive.Some non-MPI message-passing systems have called tags “message types”.
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
sdata = {'a': 7, 'b': 3.14}
comm.send(sdata, dest=1, tag=11)
elif rank == 1:
rdata = comm.recv(source=0, tag=11)
print(rdata)
{'a': 7, 'b': 3.14}
Blocking / Non-blocking
One sided communication
MPI_SEND (start, count, datatype, dest, tag, comm)
MPI_RECV(start, count, datatype, source, tag, comm, status)
...
if(rank == 0){
// generate data
MPI_Send(&sdata, 1, MPI_INT, 1, 100,
MPI_COMM_WORLD);
}
if(rank == 1){
MPI_Recv(&rdata, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
//do somthing with data
}
def definetype():
dtypes = [MPI.INT, MPI.DOUBLE]
dt_size = MPI.INT.size + MPI.DOUBLE.size
displ = [0, MPI.INT.size]
new_dtype = MPI.Datatype.Create_struct([1, 1], displ, dtypes)
new_dtype = new_dtype.Create_resized(0, dt_size)
new_dtype.Commit()
return new_dtype
def fill_data(data, num):
for i in range(num):
data[i][0] = i + 1
data[i][1] = 3.14 * (i + 1)
mytype = definetype()
data = np.zeros(2, dtype="int32, double")
if rank == 0:
comm.Recv([data, mytype], source=1, tag=0)
print(data)
elif rank == 1:
fill_data(data, 2)
comm.Send([data, mytype], dest=0, tag=0)
[(1, 3.14) (2, 6.28)]
Overlapping communication and computation
Avoiding deadlocks
Send a large message from process 0 to process 1
What happens with this code?
MPI_Sendrecv
)MPI_Bsend
)
MPI_Request request;
MPI_Isend(start, count, datatype, dest, tag, comm, &request);
MPI_Irecv(start, count, datatype, dest, tag, comm, &request);
MPI_Wait(&request, &status);
MPI_Test(&request, &flag, &status);
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
assert comm.size == 2
sdata = numpy.array([(rank+1) * i for i in range(10)])
rdata = numpy.array([0] * 10)
if comm.rank == 0:
target = 1
else:
target = 0
request = comm.Isend([sdata, MPI.INT], dest=target, tag=10)
comm.Recv([rdata, MPI.INT], source=target, tag=MPI.ANY_TAG)
request.Wait()
print("rank: %d received data %s" %(rank, rdata))
rank 1: received data [0 1 2 3 4 5 6 7 8 9]
rank 0: received data [0 2 4 6 8 10 12 14 16 18]
MPI_Waitall(count, array_of_requests, array_of_statuses); MPI_Waitany(count, array_of_requests, &index, &status); MPI_Waitsome(incount, array_of_requests, &outcount, array_of_indices, array_of_statuses);
Synchronous mode (MPI_Ssend
)
Buffered mode (MPI_Bsend
)
Ready mode (MPI_Rsend
)
Non-blocking versions for all operations (MPI_Issend, etc.)
MPI_Recv receives messages sent in any mode.
Simultaneous send and receive
Intended for sending data in a "chain"
int MPI_Sendrecv(
void *sendbuf,
int sendcount,
MPI_Datatype sendtype,
int dest,
int sendtag,
void *recvbuf,
int recvcount,
MPI_Datatype recvtype,
int source,
int recvtag,
MPI_Comm comm,
MPI_Status *status
);
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
to = (rank + 1) % size
fr = (rank - 1) % size
sdata = rank
rdata = comm.sendrecv(sdata, dest=to, source=fr)
print("rank %d got data from %d" %(rank, rdata))
rank 0 got data from 3
rank 1 got data from 0
rank 2 got data from 1
rank 3 got data from 2
MPI_Barrier( comm )
from mpi4py import MPI
rank = MPI.COMM_WORLD.Get_rank()
print("rank %d: before barrier" %(rank))
MPI.COMM_WORLD.barrier()
print("rank %d: after barrier" %(rank))
rank 0: before barrier
rank 1: before barrier
rank 2: before barrier
rank 3: before barrier
rank 3: after barrier
rank 1: after barrier
rank 2: after barrier
rank 0: after barrier
Collective routines on the same communicator must be called in the same order on all participating processes
For multi-threaded processes (MPI_THREAD_MULTIPLE
)
Message tags are not used
MPI.COMM_WORLD.bcast(data, root=proc_id)
MPI.COMM_WORLD.gather(data, root=proc_id)
MPI.COMM_WORLD.scatter(data, root=proc_id)
# for numpy arrays
MPI.COMM_WORLD.Scatter(sendbuf, recvbuf, root=0)
# sendobj is iterable
MPI.COMM_WORLD.allgather(sendobj)
MPI.COMM_WORLD.alltoall(sendobj)
# for numpy arrays
MPI.COMM_WORLD.Allgather([x, MPI.DOUBLE], [xg, MPI.DOUBLE])
MPI_Allgather
is equivalent to
MPI_Gather
followed by MPI_Bcast
MPI_Allgather
are usually faster (try on ACCRE)MPI_Alltoall
MPI_Scatter(sbuf, 1, MPI_INT, rbuf, 100, MPI_INT, root, comm);
MPI_Scatterv
Variant alltoallw:
MPI reduction operations include: | |||
---|---|---|---|
MPI.MAX | MPI.MIN | ||
MPI.SUM | |||
MPI.PROD | |||
MPI.LAND | MPI.RAND | ||
MPI.LOR | MPI.ROR | ||
MPI.MAXLOC | MPI.MINLOC | ||
MPI_BXOR |
MPI.COMM_WORLD.reduce(sendobj, op=MPI.SUM, root=0)
MPI.COMM_WORLD.allreduce(sendobj, op=MPI.SUM)
MPI_Op_create(user_fcn, commutes, &op);
MPI_Op_free(&op);
user_fcn(invec, inoutvec, len, datatype);
Depending on the value of commute
The user function should perform:
for(i=0; i<len; i++)
inoutvec[i] = invec[i] op inoutvec[i];
The operation op is always assumed to be associative.
The user function can be non-commutative. Example?
Product of complex number
user_fcn(invec, inoutvec, len, datatype);
python:
user_fcn(inelem1, inelem2, outelem)
from Complex import Complex
# Complex class defined in a separate file
def myprod(a, b, c):
c = Complex()
c.real = a.real * b.real - a.img * b.img
c.img = a.img * b.real + a.real * b.img
return c
Product of complex number
comm = size = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
a = Complex(rank + 1, rank)
print("Rank %d: Data %s" %(rank, a))
myop = MPI.Op.Create(myprod, True) # commute is True
b = comm.Reduce(a, op=myop, root=0)
if rank==0:
print("Produc: %s" %(b))
Rank 0: Data (1.0 0.0)
Rank 1: Data (2.0 1.0)
Rank 3: Data (4.0 3.0)
Rank 2: Data (3.0 2.0)
Product: (-5.0 40.0)
For C version https://www.mpi-forum.org/docs/mpi-1.1/mpi-11-html/node80.html
def unpack_array(array_mem, dt):
a = array_mem.tobytes()
itemsize = dt.Get_size()
array_len = int(array_mem.shape[0])
unpack_array = []
for i in range(int(array_len/itemsize)):
unpack_array.append(int(np.frombuffer(a[itemsize*i:itemsize*(i+1)],
dtype=np.int64)))
return np.array(unpack_array)
def myprod(xmem, ymem, dt):
a = unpack_array(xmem, dt)
b = unpack_array(ymem, dt)
c = np.array([0] * len(a))
for i in range(len(a)):
c[i] = a[i] % 2 | b[i] % 2
ymem[:] = memoryview(bytearray(c))[:]
a = np.array([rank * (i + 1) for i in range(3)])
b = np.array([0] * 3)
print("Rank %d: Data %s" %(rank, a))
myop = MPI.Op.Create(myprod, True) # commute is True
comm.Reduce([a, MPI.DOUBLE], [b, MPI.DOUBLE], op=myop, root=0)
if rank==0:
print("Contains odd numbers: %s" %(b))
Rank 0: Data [0 0 0]
Rank 2: Data [2 4 6]
Rank 1: Data [1 2 3]
Rank 3: Data [3 6 9]
Contains odd numbers: [1 0 1]
MPI-3 introduced nonblocking versions of collective operations
MPI_Request
, use the usual MPI_Wait
, MPI_Test
, etc. to complete.MPI_Ibarrier
A virtual topology represents the way that MPI processes communicate
A physical topology represents that connections between the cores, chips, and nodes in the hardware
MPI provides routines to create new communicators that order the process ranks
Two types of virtual topology supported:
Additional routines provide access to the defined virtual topology
MPI_Cart_create(MPI_Comm oldcomm, int ndim, int dims[],
int qperiodic[], int qreorder, MPI_Comm *newcomm)
The mesh is periodic in coordinate direction i if qperiodic[i]
is true (torus).
The ranks in the new communicator are reordered
dims[i] for i=0 to ndim-1
equals nnodes.
MPI_Dims_create(int nnodes, int ndim, int dims[])
Any value of dims[i]
that is 0 on input will be replaced
Makes life easier
3D torus
int p;
MPI_Comm_size(MPI_COMM_WORLD, &p);
MPI_Dims_create(p, 3, dims);
int periods[3] = {1,1,1};
MPI_Comm cart_comm;
MPI_Cart_create(comm, 3, dims, periods, 0, &cart_comm);
from mpi4py import MPI
comm = MPI.COMM_WORLD
world_rank = comm.Get_rank()
size = comm.Get_size()
dims = MPI.Compute_dims(size, [0]*2)
cart_comm = comm.Create_cart(
dims, periods=[True,True], reorder=True)
new_rank = cart_comm.Get_rank()
print("World rank: %d -- Cart rank: %d" %(
world_rank, new_rank));
cart_comm.Free()
World rank: 2 -- Cart rank: 2
World rank: 1 -- Cart rank: 1
World rank: 3 -- Cart rank: 3
World rank: 0 -- Cart rank: 0
MPI_Cartdim_get()
MPI_Cart_get()
MPI_Cart_rank()
MPI_Cart_coords()
MPI_Cart_shift(MPI_Comm comm, int direction, int disp,
int *rank_source, int *rank_dest)
ndim = 2
dims = MPI.Compute_dims(size, [0]*ndim)
cart_comm = comm.Create_cart(dims, periods=[True,True], reorder=True)
new_rank = cart_comm.Get_rank()
if new_rank==0:
print("Cartesian dim: %s" %(dims))
for i in range(ndim):
for d in (-1, +1):
source, dest = cart_comm.Shift(i, d)
if new_rank == 0:
print("Dir %d, disp %d - Src %d - Dest %d" %(i, d, source, dest));
Cartesian dim: [4, 2]
Dir 0, disp -1 - Src 2 - Dest 6
Dir 0, disp 1 - Src 6 - Dest 2
Dir 1, disp -1 - Src 1 - Dest 1
Dir 1, disp 1 - Src 1 - Dest 1
MPI_Neighbor_allgather
Profiling interface: PMPI routines
In exact arithmetic, you always get the same results
Message-passing programming models are by default nondeterministic
How can we "ensure" determinism?
MPI_ANY_SOURCE
or MPI_ANY_TAG
)Depending on the process to hardware mapping
Reduction results might be different for non-associative operations
Allreduce does guarantee that the same value is received by all processes for each call
Why didn’t MPI mandate determinism?
MPI_Wtime
:double t1, t2;
t1 = MPI_Wtime();
...
t2 = MPI_Wtime();
printf( "time is %d\n", t2 - t1 );
MPI_Wtick
MPI_WTIME_IS_GLOBAL
. from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = np.array([rank * i for i in range(10)])
local_wt = MPI.Wtime();
comm.Bcast(data, root=0);
local_wt = MPI.Wtime() - local_wt;
print("Bcast time %f" %(wt))
Bcast time 0.003616
Bcast tiBcast time 0.005131
me 0.0020Bcast time 0.004574
81
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
data = np.array([rank * i for i in range(10)])
comm.barrier();
local_wt = MPI.Wtime();
comm.Bcast(data, root=0);
local_wt = MPI.Wtime() - local_wt;
wt = comm.reduce(local_wt, op=MPI.MAX, root=0);
if rank == 0:
print("Bcast time %f" %(wt))
Bcast time 0.004041
How accurate is this measurement?
Not all MPI functions need to be instrumented
Send
routine for each rank #include <stdio.h>
#include "mpi.h"
static int numsend = 0;
int MPI_Send(void *buf, int count, MPI_Datatype type,
int dest, int tag, MPI_Comm comm) {
numsend++;
return PMPI_Send(buf, count, type, dest, tag, comm);
}
int MPI_Finalize() {
int me;
PMPI_Comm_rank(MPI_COMM_WORLD, &me);
printf("%d sent %d messages.\n", me, numsend);
return PMPI_Finalize();
}
MPI_COMM_WORLD
as process identifier#elements * PMPI_Type_size(type)
Intel Trace Analyzer: https://researchcomputing.princeton.edu/faq/using-intel-trace-analyze
VAMPIR: https://vampir.eu/
ParaProf: http://www.cs.uoregon.edu/research/tau/vihps14/ParaProf.pdf
cprofile module in python
You are encouraged to run the examples on ACCRE
Image downloaded from: https://commons.wikimedia.org/wiki/File:Pi_30K.gif
Calculating pi via numerical integration
For dividing the interval in n slices
w = 1.0 / n
mypi = 0.0
for i in range(rank + 1, n + 1, size):
mypi += w * np.sqrt(1 - np.power(i / n, 2))
pi = comm.reduce(mypi, op=MPI.SUM, root=0)
if rank==0:
print("PI: %f" %(4 * pi))
ana@webserver:~/mpi_lecture$ mpiexec -n 2 python3 compute_pi.py 10
PI: 2.904518
ana@webserver:~/mpi_lecture$ mpiexec -n 2 python3 compute_pi.py 100
PI: 3.120417
ana@webserver:~/mpi_lecture$ mpiexec -n 2 python3 compute_pi.py 1000
PI: 3.139555
ana@webserver:~/mpi_lecture$ mpiexec -n 2 python3 compute_pi.py 10000
PI: 3.141391
Example: 6.4 mil slices (2 to 64 processes)
Example: keep 100k slices per processes
Figure shows 4 processes and a list containing 8 integers
global_array_size = get_input_data_size()
# calculate total height of tree
height = np.log2(size);
if rank==0:
# create input array
global_data = get_input_data()
print("Global data %s" %(global_data))
else:
global_data = None
local_array_size = global_array_size / size
local_data = np.array([0] * local_array_size)
comm.Scatter([global_data, MPI.INT], [local_data, MPI.INT], root=0)
print("Rank %d: data %s" %(rank, local_data))
Global data [1 3 15 12 14 5 7 0 8 9 4 6 13 11 2 10]
Rank 0: data [ 1 3 15 12]
Rank 1: data [14 5 7 0]
Rank 3: data [13 11 2 10]
Rank 2: data [ 8 9 4 6]
local_data.sort()
print("Rank %d: local data sorted %s" %(rank, local_data))
Global data [1 3 15 12 14 5 7 0 8 9 4 6 13 11 2 10]
Rank 0: data [1 3 12 15]
Rank 1: data [0 5 7 14]
Rank 3: data [2 10 11 13]
Rank 2: data [4 6 8 9]
for height in range(max_height):
parent = rank & (~(1 << height))
if rank == parent:
child = rank | (1 << height)
print("At height %d, rank %d receive from %d" %(height, rank, child))
else:
print("At height %d, rank %d sends to %d" %(height, rank, parent))
break
At height 0, rank 0 receive from 1
At height 0, rank 1 sends to 0
At height 0, rank 2 receive from 3
At height 0, rank 3 sends to 2
At height 0, rank 4 receive from 5
At height 0, rank 5 sends to 4
At height 0, rank 6 receive from 7
At height 0, rank 7 sends to 6
At height 1, rank 0 receive from 2
At height 1, rank 2 sends to 0
At height 1, rank 4 receive from 6
At height 1, rank 6 sends to 4
At height 2, rank 0 receive from 4
At height 2, rank 4 sends to 0
for height in range(max_height):
step = 1 << height
parent = rank & (~(1 << height))
if rank == parent:
child = rank | (1 << height)
# receive data from the right child
data = np.array([0] * len(local_array))
comm.Recv([data, MPI.INT], source=child, tag=height)
# merge the array with your own data
local_array = np.array(list(merge(local_array, data)))
else:
# send your data to the parent
comm.Send([local_array, MPI.INT], dest=parent, tag=height)
break
mpiexec -n 4 python3 merge_sort.py
Global data [ 1 7 12 2 13 15 14 11 6 5 10 8 3 9 0 4]
Sorted array [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15]
mpiexec -n 8 python3 merge_sort.py
Global data [ 5 0 17 2 10 15 13 30 20 16 19 29 28 23 31 3 25 11 4 9 24 12 14 7
26 6 18 8 27 1 21 22]
Sorted array [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
24 25 26 27 28 29 30 31]
More formal, use asserts to check correctness
all(res[i] <= res[i+1] for i in range(len(res)-1))
Dot product between rows and vector
A
res
vectorvect
vector my_row_count = number_rows // size
local_A = np.zeros(shape=(my_row_count, number_rows), dtype='i')
if rank==0:
A = read_matrix()
vect = read_vector()
print("Multiply matrix %s with vector %s" %(A, vect))
else:
A = None
vect = np.empty(number_rows, dtype='i')
comm.Scatter([A, MPI.INT], [local_A, MPI.INT], root=0)
comm.Bcast([vect, MPI.INT], root=0)
print("rank %d: matrix %s" %(rank, local_A))
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
rank 0: matrix
[[0 1 2 3]
[2 3 4 5]]
rank 1: matrix
[[4 5 6 7]
[6 7 8 9]]
# local computation
local_res = np.zeros(my_row_count, dtype='i')
for i in range(my_row_count):
local_res[i] = np.dot(local_A[i], vect)
print("rank %d: local result %s" %(rank, local_res))
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
rank 0: matrix
[[0 1 2 3]
[2 3 4 5]]
rank 1: matrix
[[4 5 6 7]
[6 7 8 9]]
rank 0: local result [4 8]
rank 1: local result [12 16]
res = None
if rank == 0:
res = np.empty(size * my_row_count, dtype='i')
comm.Gather([local_res, MPI.INT], [res, MPI.INT], root=0)
if rank==0:
print("Multiplication result %s" %(res))
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
Multiplication result [ 4 8 12 16]
What are the limitations?
A
res
vectorADD
reduce op on all processes responsible for the elementvect
vectorif rank==0:
A, vect = read_input_data()
for i in range(size):
offset = (i % chunk_per_row) * chunk_size
row_offset = i // chunk_per_row
comm.Send([vect[offset : offset + chunk_size], MPI.INT], dest=i, tag=1)
comm.Send([A[row_offset, offset : offset + chunk_size], MPI.INT], dest=i, tag=2)
comm.Recv([local_vect, MPI.INT], source=0, tag=1)
comm.Recv([local_A, MPI.INT], source=0, tag=2)
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
rank 0: matrix [0 1], vect [0 1]
rank 2: matrix [2 3], vect [0 1]
rank 4: matrix [4 5], vect [0 1]
rank 3: matrix [4 5], vect [0 1]
rank 5: matrix [6 7], vect [0 1]
rank 7: matrix [8 9], vect [0 1]
rank 1: matrix [2 3], vect [0 1]
rank 6: matrix [6 7], vect [0 1]
local_res = np.dot(local_A, local_vect)
print("rank %d: local result %s" %(rank, local_res))
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
rank 0: matrix [0 1], vect [0 1]
rank 2: matrix [2 3], vect [0 1]
rank 4: matrix [4 5], vect [0 1]
rank 3: matrix [4 5], vect [0 1]
rank 5: matrix [6 7], vect [0 1]
rank 7: matrix [8 9], vect [0 1]
rank 1: matrix [2 3], vect [0 1]
rank 6: matrix [6 7], vect [0 1]
rank 6: local result 7
rank 4: local result 5
rank 2: local result 3
rank 3: local result 5
rank 5: local result 7
rank 0: local result 1
rank 7: local result 9
rank 1: local result 3
color = rank // chunk_per_row
new_comm = MPI.COMM_WORLD.Split(color, rank)
local_res = new_comm.reduce(local_res, root=0)
if new_comm.Get_rank() == 0:
print("rank %d: row %d result %d" %(rank, color, local_res))
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
rank 6: local result 7
rank 4: local result 5
rank 2: local result 3
rank 3: local result 5
rank 5: local result 7
rank 0: local result 1
rank 7: local result 9
rank 1: local result 3
rank 4: row 2 result 12
rank 2: row 1 result 8
rank 0: row 0 result 4
rank 6: row 3 result 16
if new_comm.Get_rank() == 0:
comm.send(local_res, dest=0, tag=3)
res = np.zeros(number_rows, dtype='i')
if rank == 0:
for i in range(number_rows):
status = MPI.Status()
ret = comm.recv(source=MPI.ANY_SOURCE, tag=3, status=status)
rnk = status.Get_source()
res[rnk // chunk_per_row] = ret
print("Multiplication result %s" %(res))
mpiexec -n 8 python3 matrix_vector_mult_block.py 4
Multiply matrix
[[0 1 2 3]
[2 3 4 5]
[4 5 6 7]
[6 7 8 9]] with vector [0 1 0 1]
Multiplication result [ 4 8 12 16]
How can we do it better?