Skip to content

MPI Collective Operations Examples

A collection of optimized MPI collective operation patterns and implementations for common HPC scenarios.

Basic Collective Operations

All-to-All Communication

#include <mpi.h>
#include <stdio.h>

int main(int argc, char** argv) {
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Prepare data
    float* sendbuf = malloc(size * sizeof(float));
    float* recvbuf = malloc(size * sizeof(float));
    for(int i = 0; i < size; i++) {
        sendbuf[i] = rank * size + i;
    }

    // Perform all-to-all
    MPI_Alltoall(sendbuf, 1, MPI_FLOAT,
                 recvbuf, 1, MPI_FLOAT,
                 MPI_COMM_WORLD);

    free(sendbuf);
    free(recvbuf);
    MPI_Finalize();
    return 0;
}

Reduction Operations

// Custom reduction operation
void max_loc(void* in, void* inout, int* len, MPI_Datatype* type) {
    struct {
        double val;
        int rank;
    } *in_data = in, *inout_data = inout;

    for(int i = 0; i < *len; i++) {
        if(in_data[i].val > inout_data[i].val) {
            inout_data[i] = in_data[i];
        }
    }
}

// Register and use custom operation
MPI_Op op;
MPI_Op_create(max_loc, 1, &op);
MPI_Reduce(sendbuf, recvbuf, count, MPI_DOUBLE_INT, op, root, comm);
MPI_Op_free(&op);

Advanced Patterns

Non-blocking Collectives

// Overlap computation and communication
std::vector<MPI_Request> requests;
requests.reserve(num_ops);

// Initialize non-blocking operations
for(int i = 0; i < num_ops; i++) {
    MPI_Request req;
    MPI_Iallreduce(sendbuf + offset[i], recvbuf + offset[i],
                   count[i], MPI_DOUBLE, MPI_SUM,
                   MPI_COMM_WORLD, &req);
    requests.push_back(req);

    // Do computation while communication progresses
    compute_chunk(i);
}

// Wait for all operations to complete
MPI_Waitall(requests.size(), requests.data(), MPI_STATUSES_IGNORE);

Neighborhood Collectives

// Create cartesian communicator
int dims[3] = {nx, ny, nz};
int periods[3] = {1, 1, 1};
MPI_Comm cart_comm;
MPI_Cart_create(MPI_COMM_WORLD, 3, dims, periods, 0, &cart_comm);

// Define neighborhood
int maxneigh = 6;  // 3D stencil
double *sendbuf = malloc(maxneigh * count * sizeof(double));
double *recvbuf = malloc(maxneigh * count * sizeof(double));

// Perform neighborhood collective
MPI_Neighbor_alltoall(sendbuf, count, MPI_DOUBLE,
                      recvbuf, count, MPI_DOUBLE,
                      cart_comm);

Performance Optimization

Vector Collective

// Gather with vector datatype
MPI_Datatype vector;
MPI_Type_vector(count, blocklength, stride, MPI_DOUBLE, &vector);
MPI_Type_commit(&vector);

// Use vector in collective operation
MPI_Gatherv(sendbuf, 1, vector,
            recvbuf, recvcounts, displs, MPI_DOUBLE,
            root, MPI_COMM_WORLD);

MPI_Type_free(&vector);

Persistent Collectives

// Initialize persistent collective
MPI_Request req;
MPI_Allreduce_init(sendbuf, recvbuf, count,
                   MPI_DOUBLE, MPI_SUM,
                   MPI_COMM_WORLD, MPI_INFO_NULL, &req);

// Use in computation loop
for(int iter = 0; iter < max_iters; iter++) {
    // Start collective
    MPI_Start(&req);

    // Overlap with computation
    compute_iteration(iter);

    // Wait for completion
    MPI_Wait(&req, MPI_STATUS_IGNORE);
}

// Clean up
MPI_Request_free(&req);

Common Patterns

Scatter-Compute-Gather

void parallel_transform(double* data, int n, int root) {
    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Calculate local size
    int local_n = n / size;
    double* local_data = malloc(local_n * sizeof(double));

    // Scatter data
    MPI_Scatter(data, local_n, MPI_DOUBLE,
                local_data, local_n, MPI_DOUBLE,
                root, MPI_COMM_WORLD);

    // Local computation
    for(int i = 0; i < local_n; i++) {
        local_data[i] = transform(local_data[i]);
    }

    // Gather results
    MPI_Gather(local_data, local_n, MPI_DOUBLE,
               data, local_n, MPI_DOUBLE,
               root, MPI_COMM_WORLD);

    free(local_data);
}

Reduction Tree

void tree_reduce(double* local_data, double* global_result,
                int count, MPI_Comm comm) {
    int rank, size;
    MPI_Comm_rank(comm, &rank);
    MPI_Comm_size(comm, &size);

    // Allocate temporary buffer
    double* temp = malloc(count * sizeof(double));
    memcpy(temp, local_data, count * sizeof(double));

    // Tree reduction
    for(int stride = 1; stride < size; stride *= 2) {
        if((rank % (2 * stride)) == 0) {
            int partner = rank + stride;
            if(partner < size) {
                MPI_Recv(temp, count, MPI_DOUBLE,
                        partner, 0, comm, MPI_STATUS_IGNORE);
                for(int i = 0; i < count; i++) {
                    local_data[i] += temp[i];
                }
            }
        } else {
            int partner = rank - stride;
            MPI_Send(local_data, count, MPI_DOUBLE,
                    partner, 0, comm);
            break;
        }
    }

    free(temp);
}

System-Specific Examples

// Optimized for AMD MI instinct
MPI_Info_create(&info);
MPI_Info_set(info, "mpi_memory_alloc_kinds",
             "gpu:managed");
// Optimized for NVIDIA A100
MPI_Info_create(&info);
MPI_Info_set(info, "mpi_cuda_support", "true");
// Network topology aware
MPI_Info_create(&info);
MPI_Info_set(info, "mpi_topology_aware",
             "true");

References

  1. MPI Standard