#include #include #include #include #include #define MAX_SIZE 1000 /* the ranks of the processes being used */ #define DOWNLOAD_PROCESS 0 #define DECOMPRESS_PROCESS 1 #define DISPLAY_PROCESS 2 /* the function called for the first process, downloading the video frames */ void download() { /* we keep two data buffers for the "downloaded" data */ char buffer1[MAX_SIZE]; char buffer2[MAX_SIZE]; /* a pointer to the current download buffer */ char* buffer = buffer1; /* the request, used to check for the previous send */ MPI_Request request; /* whether this is the first time through the loop */ int first = 1; /* a flag for stopping the process */ int quit = 0; while (!quit) { /* fill the buffer with data (really just a string from stdin) */ scanf("%s", buffer); /* if EOF, the message is "quit" */ if (feof(stdin)) { strcpy(buffer, "QUIT"); quit = 1; } /* wait for the previous send to finish, if there was one */ if (!first) { MPI_Wait(&request, MPI_STATUS_IGNORE); } else { first = 0; } /* start to send this buffer to the decompress process */ MPI_Isend(buffer, strlen(buffer), MPI_CHAR, DECOMPRESS_PROCESS, 0, MPI_COMM_WORLD, &request); /* swap the buffers so we don't overwrite the data */ buffer = (buffer == buffer1) ? buffer2 : buffer1; } /* wait for the last message to go out */ MPI_Wait(&request, MPI_STATUS_IGNORE); } /* the function called for the second process, decompressing the video */ void decompress() { /* buffers for incoming data, and for outgoing */ char incoming1[MAX_SIZE], incoming2[MAX_SIZE]; char outgoing1[MAX_SIZE], outgoing2[MAX_SIZE]; /* current buffers */ char* incoming = incoming1; char* outgoing = outgoing1; /* start reading the first message */ MPI_Request read_request, write_request; MPI_Irecv(incoming, MAX_SIZE, MPI_CHAR, DOWNLOAD_PROCESS, 0, MPI_COMM_WORLD, &read_request); /* wheter it's the first loop */ int first = 1; /* loop until we get a quit message */ int quit = 0; while (!quit) { /* wait for the message */ MPI_Status status; MPI_Wait(&read_request, &status); /* start a new receive into the other buffer */ MPI_Irecv((incoming == incoming1) ? incoming2 : incoming1, MAX_SIZE, MPI_CHAR, DOWNLOAD_PROCESS, 0, MPI_COMM_WORLD, &read_request); /* insert the null terminator into the one we have */ int size; MPI_Get_count(&status, MPI_CHAR, &size); incoming[size] = '\0'; /* check if it's QUIT */ if (strcmp(incoming, "QUIT") == 0) { quit = 1; } /* wait for the outgoing buffer to be free, if not the first time */ if (!first) { MPI_Wait(&write_request, MPI_STATUS_IGNORE); } else { first = 0; } /* move from download buffer to decompress */ strcpy(outgoing, incoming); /* capitalize it */ int i; for (i = 0; i < strlen(outgoing); i++) { outgoing[i] = toupper(outgoing[i]); } /* start to send the data to the display process */ MPI_Isend(outgoing, strlen(outgoing), MPI_CHAR, DISPLAY_PROCESS, 0, MPI_COMM_WORLD, &write_request); /* swap the buffers */ incoming = (incoming == incoming1) ? incoming2 : incoming1; outgoing = (outgoing == outgoing1) ? outgoing2 : outgoing1; } } /* the function called for the third process, displaying the video */ void display() { /* the buffers we receive into */ char buffer1[MAX_SIZE], buffer2[MAX_SIZE]; char* buffer = buffer1; int quit = 0; /* start receiving the first message */ MPI_Request request; MPI_Irecv(buffer, MAX_SIZE, MPI_CHAR, DECOMPRESS_PROCESS, 0, MPI_COMM_WORLD, &request); /* loop until we get quit */ while (!quit) { /* wait for the last request to finish */ MPI_Status status; MPI_Wait(&request, &status); /* start a new wait on the other buffer */ MPI_Irecv((buffer == buffer1) ? buffer2 : buffer1, MAX_SIZE, MPI_CHAR, DECOMPRESS_PROCESS, 0, MPI_COMM_WORLD, &request); /* insert the null terminator into the buffer we have */ int size; MPI_Get_count(&status, MPI_CHAR, &size); buffer[size] = '\0'; /* check for quit */ if (strcmp(buffer, "QUIT") == 0) { quit = 1; } else { /* display it */ printf("%s\n", buffer); } /* swap the buffers */ buffer = (buffer == buffer1) ? buffer2 : buffer1; } } int main (int argc, char** argv) { int rank, size; /* initialize MPI */ MPI_Init(&argc, &argv); /* get the rank (process id) and size (number of processes) */ MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); /* this example only works with three processes! */ if (size != 3) { if (rank == 0) { printf("Error, only works with 3 processes!\n"); } MPI_Finalize(); exit(0); } /* do a task depending on our rank */ if (rank == 0) { /* only process 0 can read from stdin! */ download(); } else if (rank == 1) { decompress(); } else { /* any process can write to stdout, but only one should! */ display(); } MPI_Finalize(); return 0; }