MPI Collective Operations Examples
A collection of optimized MPI collective operation patterns and implementations for common HPC scenarios.
Basic Collective Operations
All-to-All Communication
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// Prepare data
float* sendbuf = malloc(size * sizeof(float));
float* recvbuf = malloc(size * sizeof(float));
for(int i = 0; i < size; i++) {
sendbuf[i] = rank * size + i;
}
// Perform all-to-all
MPI_Alltoall(sendbuf, 1, MPI_FLOAT,
recvbuf, 1, MPI_FLOAT,
MPI_COMM_WORLD);
free(sendbuf);
free(recvbuf);
MPI_Finalize();
return 0;
}
Reduction Operations
// Custom reduction operation
void max_loc(void* in, void* inout, int* len, MPI_Datatype* type) {
struct {
double val;
int rank;
} *in_data = in, *inout_data = inout;
for(int i = 0; i < *len; i++) {
if(in_data[i].val > inout_data[i].val) {
inout_data[i] = in_data[i];
}
}
}
// Register and use custom operation
MPI_Op op;
MPI_Op_create(max_loc, 1, &op);
MPI_Reduce(sendbuf, recvbuf, count, MPI_DOUBLE_INT, op, root, comm);
MPI_Op_free(&op);
Advanced Patterns
Non-blocking Collectives
// Overlap computation and communication
std::vector<MPI_Request> requests;
requests.reserve(num_ops);
// Initialize non-blocking operations
for(int i = 0; i < num_ops; i++) {
MPI_Request req;
MPI_Iallreduce(sendbuf + offset[i], recvbuf + offset[i],
count[i], MPI_DOUBLE, MPI_SUM,
MPI_COMM_WORLD, &req);
requests.push_back(req);
// Do computation while communication progresses
compute_chunk(i);
}
// Wait for all operations to complete
MPI_Waitall(requests.size(), requests.data(), MPI_STATUSES_IGNORE);
Neighborhood Collectives
// Create cartesian communicator
int dims[3] = {nx, ny, nz};
int periods[3] = {1, 1, 1};
MPI_Comm cart_comm;
MPI_Cart_create(MPI_COMM_WORLD, 3, dims, periods, 0, &cart_comm);
// Define neighborhood
int maxneigh = 6; // 3D stencil
double *sendbuf = malloc(maxneigh * count * sizeof(double));
double *recvbuf = malloc(maxneigh * count * sizeof(double));
// Perform neighborhood collective
MPI_Neighbor_alltoall(sendbuf, count, MPI_DOUBLE,
recvbuf, count, MPI_DOUBLE,
cart_comm);
Performance Optimization
Vector Collective
// Gather with vector datatype
MPI_Datatype vector;
MPI_Type_vector(count, blocklength, stride, MPI_DOUBLE, &vector);
MPI_Type_commit(&vector);
// Use vector in collective operation
MPI_Gatherv(sendbuf, 1, vector,
recvbuf, recvcounts, displs, MPI_DOUBLE,
root, MPI_COMM_WORLD);
MPI_Type_free(&vector);
Persistent Collectives
// Initialize persistent collective
MPI_Request req;
MPI_Allreduce_init(sendbuf, recvbuf, count,
MPI_DOUBLE, MPI_SUM,
MPI_COMM_WORLD, MPI_INFO_NULL, &req);
// Use in computation loop
for(int iter = 0; iter < max_iters; iter++) {
// Start collective
MPI_Start(&req);
// Overlap with computation
compute_iteration(iter);
// Wait for completion
MPI_Wait(&req, MPI_STATUS_IGNORE);
}
// Clean up
MPI_Request_free(&req);
Common Patterns
Scatter-Compute-Gather
void parallel_transform(double* data, int n, int root) {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// Calculate local size
int local_n = n / size;
double* local_data = malloc(local_n * sizeof(double));
// Scatter data
MPI_Scatter(data, local_n, MPI_DOUBLE,
local_data, local_n, MPI_DOUBLE,
root, MPI_COMM_WORLD);
// Local computation
for(int i = 0; i < local_n; i++) {
local_data[i] = transform(local_data[i]);
}
// Gather results
MPI_Gather(local_data, local_n, MPI_DOUBLE,
data, local_n, MPI_DOUBLE,
root, MPI_COMM_WORLD);
free(local_data);
}
Reduction Tree
void tree_reduce(double* local_data, double* global_result,
int count, MPI_Comm comm) {
int rank, size;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
// Allocate temporary buffer
double* temp = malloc(count * sizeof(double));
memcpy(temp, local_data, count * sizeof(double));
// Tree reduction
for(int stride = 1; stride < size; stride *= 2) {
if((rank % (2 * stride)) == 0) {
int partner = rank + stride;
if(partner < size) {
MPI_Recv(temp, count, MPI_DOUBLE,
partner, 0, comm, MPI_STATUS_IGNORE);
for(int i = 0; i < count; i++) {
local_data[i] += temp[i];
}
}
} else {
int partner = rank - stride;
MPI_Send(local_data, count, MPI_DOUBLE,
partner, 0, comm);
break;
}
}
free(temp);
}