5. Collective Communications
in MPI.
Main Features:
- Communication is coordinated among a group of processes.
- Groups can be constructed ``by hand'' with MPI group-manipulation routines or by using MPI topology-definition routines.
- Message tags are not used. Different communicators are used instead.
- No non-blocking collective operations.
Three classes of collective operations:
- synchronization
- data movement
- collective computation
Synchronization:
MPI_Barrier(MPI_Comm comm)
Function blocks untill all processes (in comm) have reached this routine (i.e, have called it).
Data movement:
Schematic representation of collective data movement in MPI
MPI Collective Routines (data movement):
MPI_Allgather(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, MPI_Comm comm);Collective computations:
MPI_Allgatherv(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int rcounts, int *displs, MPI_Datatype rdatatype, MPI_Comm comm);
MPI_Alltoall(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, MPI_Comm comm);
MPI_Alltoallv(void *sndbuf, int *scount, MPI_Datatype sdatatype, void *recvbuf, int *rcount, int *rdispls, MPI_Datatype rdatatype, MPI_Comm comm);
MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);
MPI_Gather(void *sndbuf, int scount, MPI_Datatype datatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, int root, MPI_Comm comm);
MPI_Gatherv(void *sndbuf, int datatype, void *recvbuf, int rcounts, int *displs, MPI_Datatype rdatatype, int root, MPI_Comm comm);
MPI_Scatter(void *sndbuf, int scount, MPI_Datatype datatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, int root, MPI_Comm comm);
MPI_Scatterv(void *sndbuf, int scounts, MPI_Datatype datatype, void *recvbuf, int rcounts, int *displs, MPI_Datatype rdatatype, int root, MPI_Comm comm);
Schematic representation of collective data movement in MPI
MPI Collective Routines (computation):
MPI_Reduce(void *sndbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
MPI_Allreduce(void *sndbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
MPI_ReduceScatter(void *sndbuf, void *recvbuf, int recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
MPI_Scan(void *sndbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
Collective CommunicationHomework 6.
Basic foundation of message-passing:General features of MPI Collective communication routines:Point-to-point communicationsProblems:
They are often tedious to use and less efficient than a communications primitive that involves all nodes in a group.Alternative tool: Collective operation:
Communications primitive that involves all nodes in a group.
barrier, broadcast, gather, gatherv, allgather, allgatherv, scatter, scatterv, scatter/gather, scatterv/gatherv, reduce, allreduce, reduce and scatter, and scan(Prefix)A collective operation is executed by having all processes in the group call the communication routine, with matching arguments.
The type-matching conditions for the collective operations are more strict than the corresponding conditions between sender and receiver in point-to-point operations.
Namely, for collective operations, the amount of data sent must exactly match the amount of data specified by the receiver.Collective routine calls can return as soon as their participation in the collective communication is complete.MPI still allows distinct type maps (the layout in memory) between sender and receiver.
The completion of a call indicates that the caller is now free to access locations in the communication buffer. It does not indicate that other processors in the group have completed or even started their operations (unless otherwise indicated in the description of the operation). Thus, a collective communication call may, or may not, have the effect of synchronizing all calling processes (except for a barrier call).MPI guarantees that a message generated by collective communication calls will not be confused with a message generated by point-to-point communication.The key concept of the collective functions is to have a "group" of participating processes. The routines do not have a group identifier as an explicit argument. Instead, there is a communicator that can be thought of as a group identifier linked with a context.
Description of MPI Collective Communication Routines
1.Barrier:
A barrier is simply a synchronization primitive. A node calling it will block until all the nodes within the group have called it. The syntax is given byMPI_Barrier(MPI_Comm comm);
where comm is the communicator for the group of processes. A barrier is particularly useful in synchronizing nodes before they enter a critical section of code.2.Broadcast:
Often one node has data that is needed by a collection of other nodes. MPI provides the broadcast primitive to accomplish this task:MPI_Bcast(void *buf, int count, MPI_Datatype datatype, int root, MPI_Comm comm);where root is the originator of the broadcast and must be called by each node in the group with the same comm and root. The following example illustrates how to use an MPI_Bcast call:
float x(100);
MPI_INIT(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
load(x);}MPI_Bcast(&x, 100, MPI_FLOAT, 0, MPI_COMM_WORLD);
elseIn this example, the root node, node 0, loads data into the vector x, and 100 real number to all the other nodes. All nodes in the MPI_COMM_WORLD call the MPI_Bcast routine.MPI_Bcast(&x, 100, MPI_FLOAT, 0, MPI_COMM_WORLD);
Sample C code for MPI_Bcast
3.Gather and Scatter:
Gather and scatter are inverse operations.4.Reduce:
- Gather collects data from every member of the group (including the root) on the root node in linear order by the rank of the node (that is, the data are concatenated together node-wise on the root).
MPI provides two variants of the gather/scatter operations:
- Scatter parcels out data from the root to every member of the group in linear order by node.
- one in which the number of data items collected from/sent to each node can be different.
- aa more efficient one in the special case where the number per node is uniform.
You call the uniform case versions of gather and scatter like this:
MPI_Gather(void *sndbuf, int scount, MPI_Datatype datatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, int root,MPI_Comm comm);
MPI_Scatter(void *sndbuf, int scount, MPI_Datatype datatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, int root, MPI_Comm comm);
where:
sndbuf is the buffer with scount items of type datatype to send data fromIn the gather operation, each node will have its sndbuf collected on root, and only the root node's recvbuf is used.recvbuf is the buffer with rcount items of type rdatatype to receive data into, root is the root node for the operation, and comm is the communicator for the group.
For scatter, the opposite holds: the root node's sndbuf is distributed to the other nodes' recvbuf (the other nodes' sndbuf are irrelevant).
Sample C code for MPI_Gather
Sample C code for MPI_ScatterMPI has another primitive, MPI_Gatherv, that extends the functionality of MPI_Gather into a varying count of data from each process:
MPI_Gatherv(void *sndbuf, int scounts, MPI_Datatype datatype, void *recvbuf, int rcounts, int *displs, MPI_Datatype rdatatype, int root, MPI_Comm comm);
where
rcounts is a vector of receive countsand you add a vector of displacements, displs, to specify offsets on root's active buffer.
The new argument displs allows more flexibility as to where the data is placed on the root.
The outcome is as if each node, including the root node, sends a message to the root using MPI_Send, and the root executes n receives using MPI_Recv. Messages are placed in the receive buffer of the root node in rank order, that is, the data sent from node j is placed in the jth portion of the receive buffer, recvbuf, on the root node. The jth portion of recvbuf begins at offset displs[j] elements (in terms of rdatatype) into recvbuf.
The inversion operation to MPI_Gatherv is MPI_Scatterv:
MPI_Scatterv(void *sndbuf, int scounts, MPI_Datatype datatype, void *recvbuf, int rcounts, int *displs, MPI_Datatype rdatatype, int root, MPI_Comm comm);
Suppose that we have two vectors on node 0, and we want to form their element-wise product in the first vector through a parallel operation. The following example shows how you could do this using scatter and gather. Note that broadcast sends a copy of the entire vector to each node whereas scatter only sends a part of the vector to each node.
float *x, *y, x0[100], y0[100]
. . .MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
if( rank == 0) {
.... allocate memory for x, y ....
.... load x, y ....
}for( i==1; x<100; i++)MPI_Scatter(x, 100, MPI_FLOAT, x0, 100, MPI_FLOAT, 0, comm);MPI_Scatter(y, 100, MPI_FLOAT, y0, 100, MPI_FLOAT, 0, comm);
x0(i) = x0(i)*y0(i);. . .MPI_Gather(x0, 100, MPI_FLOAT, x, 100, MPI_FLOAT, 0, comm);
An allgather operation provides a more efficient way to do a gather followed by a broadcast: all members of the group receive the collected data.
MPI_Allgather(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int rcount,MPI_Datatype rdatatype, MPI_Comm comm);
The type signatures associated with scount and sdatatype on a node must be equal to the type signatures associated with rcount and rdatatype on any other node. The outcome of a call to MPI_Allgather is as if all nodes execute n calls to MPI_Gather for root = 0, 1, ..., n-1.
Sample C code for MPI_AllgatherSimilarly, an allgatherv operation provides a more efficient way to do a gatherv followed by a broadcast: all members of the group receive the collected data.
MPI_Allgatherv(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int MPI_Comm comm);
An all to all, or complete, exchange provides a way of effecting a data redistribution without having to gather all the data onto one node and then to scatter them back out again.
MPI_Alltoall(void *sndbuf, int scount, MPI_Datatype sdatatype, void *recvbuf, int rcount, MPI_Datatype rdatatype, MPI_Comm comm);
You can look at MPI_Alltoall as an extension of MPI_Allgather in the case where each node sends distinct data to each of the receivers. The jth block sent from node i is received by node j and is placed in the ith block of the recvbuf. This is typically useful for implementing Fast Fourier Transform.
Note that these are typically very expensive operations since they involve communicating a lot of data. You can find the details of their calling sequences in the MPI reference.
Some of the most-used collective operations are global reductions or combine operations. A global reduction combines partial results from each node in the group using some basic function, such as sum, max, or min, and distributes the answer to the root node:5.Reduce-Scatter:MPI_Reduce(void *sndbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
where sndbuf specifies a buffer containing count items of data with type datatype to be combined using op (MPI operator). The result is placed in recvbuf on each node (for the second variant, only on the root node).
Sample C code for MPI_Reduce
Predefined reduce operations in MPI:
MPI Operator Operation
----------------------------------------------
MPI_MAX maximum
MPI_MIN minimum
MPI_SUM sum
MPI_PROD product
MPI_LAND logical and
MPI_BAND bitwise and
MPI_LOR logical or
MPI_BOR bitwise or
MPI_LXOR logical exclusive or
MPI_BXOR bitwise exclusive or
MPI_MAXLOC max value and location
MPI_MINLOC min value and location
MPI allows certain combinations of operations and datatype arguments. The C binding of MPI basic datatypes is the following:
Integer: MPI_INT, MPI_LONG, MPI_SHORT,
MPI_UNSIGNED_SHORT, MPI_UNSIGNED,
MPI_UNSIGNED_LONGFloating Point: MPI_FLOAT, MPI_DOUBLE
Byte: MPI_BYTE
Note that the datatype MPI_BYTE does not correspond to a C datatype. A value of type MPI_BYTE consists of a byte (8 binary digits)
Combining the two above charts, the valid datatypes for each operation is specified below.
MPI Operator Allowed Types
--------------------------------------------------
MPI_MAX, MPI_MIN Integer, Floating point
MPI_SUM, MPI_PROD Integer Floating point
MPI_LAND, MPI_LOR, MPI_LXOR Integer
MPI_BAND, MPI_BOX, MPI_BXOR Integer, Byte
The operators MPI_MINLOC and MPI_MAXLOC compute a global minimum or maximum and also attach an index to the minimum or maximum value. One application of these is to compute a global minimum or maximum and the rank of the node containing the value.
In order to use MPI_MINLOC and MPI_MAXLOC in a reduce operation, one must provide a datatype argument that represents a pair (value and index). MPI provides seven such predefined datatypes. You
can use the operations MPI_MINLOC and MPI_MAXLOC with each of the following datatypes.
In C binding:
MPI Datatype Description
--------------------------------------
MPI_FLOAT_INT float and int
MPI_DOUBLE_INT double and int
MPI_LONG_INT long and int
MPI_2INT pair of intMPI includes variants of each of the reduce operations where the result is returned to all processes in the group. This is the MPI_Allreduce call which requires that all processes participate in these operations.
MPI_Allreduce(void *sndbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op,
MPI_Comm comm);
Sample C code for MPI_Allreduce
MPI includes a reduce-scatter operation that has the following syntax in C:6.Scan:MPI_Reduce_scatter(void *sndbuf, void *recvbuf,
int recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);where some arguments (sndbuf, recvbuf, datatype, op, comm and ierr) are as before, but recvcounts is an integer array specifying the number of elements in the result distributed to each node. The recvcounts array must be identical on all calling nodes.
The MPI_Reduce_scatter routine is functionally equivalent to an MPI_Reduce operation with count equal to the sum of recvcounts[i] followed by MPI_Scatterv with sendcounts equal to recvcounts. The purpose for including this primitive to handle this task is to allow for a more efficient implementation. You can find the details in the MPI document.
A scan, or a prefix-reduction operation, performs partial reductions on distributed data. Specifically, the scan operation returns the reduction of the values in the send buffers of processes ranked 0, 1, ..., n into the receive buffer of the node ranked n. The syntax is:MPI_Scan(void *sndbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm);
where the arguments are as in the MPI_Reduce operation.
Note that collective operations must still be performed in the same order on each node to ensure correctness and to avoid deadlocks.