Home CPSC 425

Distributed Sorting

Overview

Sorting in a distributed environment is complicated by the fact we will not have all of the data available on one processor.

In some cases, the amount of data would not even fit into the memory of a single machine at once.

To handle this, we can sort our data according to the following rules:

  1. All of the data on each process is locally sorted.
  2. All of the data stored on process $i$ is less than all of the data stored on process $j$ if $i \lt j$.

The goal of our parallel sorting algorithm will be to make these two true. This can be done without ever loading all of the data onto any one process.


Odd-Even Transposition Sort

The basis for one method of distributed sorting is the "odd-even transposition sort". The serial version of this algorithm is given below:

  1. For each phase:
    1. If this is an even phase:
      1. Compare and swap each even/odd pair of elements.
    2. Else:
      1. Compare and swap each odd/even pair of elements.

For example, if our data is as follows:

6, 9, 3, 1

Then we would start with phase 0. This is an even phase, so we will compare/swap each even/odd pair. This means pairs where the first element is an even index and the second is odd:

(6, 9) (3, 1)
Becomes:
6, 9, 1, 3

Now we move to phase 1. This is an odd phase so we will compare swap the odd/even pairs:

6, (9, 1), 3
Becomes:
6, 1, 9, 3
Next, we have phase 2, which is an even phase:
(6, 1) (9, 3)
Becomes:
1, 6, 3, 9,
Lastly, we have phase 3 which is an odd phase:
1, (6, 3), 9
Becomes:
1, 3, 6, 9

How many phases will we need in the worst case with N values?


The Parallel Odd-Even Transposition

The odd-even transposition algorithm can be extended to work in a distributed system.

In this scenario, each process will store its own portion of the data. It will keep its own data sorted.

It will then share its data with either the process to its left or right based on the phase we are in.

This algorithm is given below:

  1. For each phase:
    1. Sort our local data.
    2. Find our partner based on the phase number and our rank.
    3. If we have a partner:
      1. Send our data to our partner.
      2. Receive our partner's data.
      3. If our rank is smaller, keep the smallest items from our data + our partners.
      4. Else, our rank is larger, so keep the largest items from our data + our partners.

This is an extension of the serial algorithm, where we are comparing and swapping a subset of the data rather than a single element.

The following program implements this idea. Some important points in the following program:

  1. It generates random data values in each process for testing. In reality, the data could come from a file, be distributed before the program begins or be created somehow.
  2. It uses the C built-in qsort function to locally sort the data. It could instead use the threaded merge sort above to sort the local data.

  3. The program avoids the deadlock situation that is lurking in the algorithm above. If both processes send their data before receiving the others, the program could hang. To avoid this, it has even processes send first, and odd ones receive first.

/* do the parallel odd/even sort */
void parallel_sort(int* data, int rank, int size) {
  int i;

  /* the array we use for reading from partner */
  int other[N];

  /* we need to apply P phases where P is the number of processes */
  for (i = 0; i < size; i++) {
    /* sort our local array */
    qsort(data, N, sizeof(int), &cmp);

    /* find our partner on this phase */
    int partner;

    /* if it's an even phase */
    if (i % 2 == 0) {
      /* if we are an even process */
      if (rank % 2 == 0) {
        partner = rank + 1;
      } else {
        partner = rank - 1;
      }
    } else {
      /* it's an odd phase - do the opposite */
      if (rank % 2 == 0) {
        partner = rank - 1;
      } else {
        partner = rank + 1;
      }
    }

    /* if the partner is invalid, we should simply move on to the next iteration */
    if (partner < 0 || partner >= size) {
      continue;
    }

    /* do the exchange - even processes send first and odd processes receive first
     * this avoids possible deadlock of two processes working together both sending */
    if (rank % 2 == 0) {
      MPI_Send(data, N, MPI_INT, partner, 0, MPI_COMM_WORLD);
      MPI_Recv(other, N, MPI_INT, partner, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
    } else {
      MPI_Recv(other, N, MPI_INT, partner, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
      MPI_Send(data, N, MPI_INT, partner, 0, MPI_COMM_WORLD);
    }

    /* now we need to merge data and other based on if we want smaller or larger ones */
    if (rank < partner) {
      /* keep smaller keys */
      while (1) {
        /* find the smallest one in the other array */
        int mini = min_index(other);

        /* find the largest one in out array */
        int maxi = max_index(data);

        /* if the smallest one in the other array is less than the largest in ours, swap them */
        if (other[mini] < data[maxi]) {
          int temp = other[mini];
          other[mini] = data[maxi];
          data[maxi] = temp;
        } else {
          /* else stop because the smallest are now in data */
          break;
        }
      }
    } else {
      /* keep larger keys */
      while (1) {
        /* find the largest one in the other array */
        int maxi = max_index(other);

        /* find the largest one in out array */
        int mini = min_index(data);

        /* if the largest one in the other array is bigger than the smallest in ours, swap them */
        if (other[maxi] > data[mini]) {
          int temp = other[maxi];
          other[maxi] = data[mini];
          data[mini] = temp;
        } else {
          /* else stop because the largest are now in data */
          break;
        }
      }
    }
  }
}

Copyright © 2018 Ian Finlayson | Licensed under a Creative Commons Attribution 4.0 International License.