C++并行排序:OpenMP并行归并排序和并行快速排序示例

本文给出了两个C++并行排序的程序实例,一个基于归并排序思路,另一个基于快速排序思路。

大概今年年初的适合有一段时间比较清闲,在一串并行的实验代码中有一个排序的逻辑,虽然并不是时间复杂度的瓶颈所在,但是一直希望能有机会把排序改成并行的版本。于是就有了下面的两段程序。本文给出了两个C++并行排序的程序实例,一个基于归并排序思路,另一个基于快速排序思路。


并行归并排序

并行归并排序的核心是std::inplace_merge函数。这个函数简单来说是将[first, middle)和[middle, last)的两段有序序列合并为一段[first, last)的有序序列。相对于传统意义上的归并而言,它的空间复杂度为O(1),也就是函数名的直译“就地合并”的意思。并行归并排序的思路也很简单,主要分为以下2个步骤:

  1. 预排序。使用std::sort,将待排序数组预排序为N个长度为k的子数组加上一个长度为b的尾巴子数组(0<b<k)。这里k可以取一个较大的粒度,如2048。预排序完成后,原数组从0开始每长为k的序列和小尾巴都是有序序列。因为不同子数组之间没有数据冲突,因此这个过程可以并行完成。
  2. 归并。将两两相邻的子数组使用std::inplace_merge归并。重复直到完成整个输入数组的归并。因为两两相邻的子数组之间也没有数据冲突,因此这个过程也可以并行完成。

这个算法的优点在于稳定性,它的核心都采用STL库完成,本身只起一个调度的作用,因此对于不同的输入数组都能有较好的表现。而缺点在于并行度上,由其特性可知,最后一次归并必然只有1个线程工作,倒数第二次归并必然只有2个线程工作,以此类推;而越到后面的归并长度又越长。因此,这个算法对多核的利用率一般。

算法的C++代码如下:

	template<class T>
	void Sort(std::vector<T> & data)
	{
		const intptr_t size = data.size();
		intptr_t stride = 2048;

		//从stride = 1开始归并排序非常缓慢
		//因此这里对data进行初排序
		//从一个较大的stride开始归并排序
		if (stride != 1) {
#pragma omp parallel for schedule(dynamic, 2048 / stride + 1)
			for (intptr_t i = 0; i < size; i += stride) {
				auto left = data.begin() + i;
				auto right = i + stride < size ? data.begin() + i + stride : data.end();
				std::sort(left, right);
			}
		}

		//并行归并排序
#pragma omp parallel
		{
			intptr_t _stride = stride;
			do
			{
				_stride *= 2;

#pragma omp for schedule(dynamic, 2048 / _stride + 1)
				for (intptr_t i = 0; i < size; i += _stride) {
					//对[i, i+_stride/2)和[i+_stride/2, i+_stride)进行归并
					auto left = data.begin() + i;
					auto mid = (i + i + _stride) / 2 < size ? data.begin() + (i + i + _stride) / 2 : data.end();
					auto right = i + _stride < size ? data.begin() + i + _stride : data.end();
					inplace_merge(left, mid, right);
				}
			} while (_stride < size);
		}
	}

并行快速排序

快速排序的核心无非两个步骤:选取锚点、移动元素。并行k快速排序也是如此。

一个简单的并行快速排序中:

  1. 假设某一次对数组A排序。
  2. 找出锚点p
  3. 并行地遍历A,然后把大于p和小于p的元素分别线程安全地插入到一个新数组的两边的,即整理A得到A^{-}pA^{+}

这种算法的优缺点分别和并行归并排序相反。由于依赖于锚点算法,因此稳定性不能保证;但整个排序过程中,每个线程基本上都不会浪费,因此并行度更佳。

上述的并行快速排序虽然逻辑简单,但是却将排序本身所需的额外空间从O(1)提升到了O(n)。这里介绍一种改进后的空间复杂度依然为O(1)的并行快速排序,是我某日头脑风暴得到的,不知道之前有没有人提出过。这种方法的思路类似于inplace_merge进行原地整理

  1. 假设某一次对数组A排序,我们维护i_r, j_r, i_w, j_w四个地址,并将i_r, i_w初始为A的首元素地址,j_r, j_w初始为A的末元素地址。语义上讲,i_rj_r分别为读入元素的下标i_wj_w分别为写回元素的下标。说到这里有的哥们应该就已经差不多get到要点了。
  2. 每个线程分配一个长度为k线程私有数组A^*,以及私有的空数组A_-^*A_+^*
  3. 找出锚点p
  4. 每个线程每次线程安全地Ai_r(或j_r)处复制k个元素到A^*,然后将i_r提升k(或将j_r降低k)。若i_r-i_w小于j_w-j_r,则优先从i_r复制;若大于,则优先从j_r复制;若等于,则随机。
  5. 每个线程整理A^*小于p的元素并入A_-^*,大于p的元素并入A_+^*
  6. 线程安全地A_-^*写回到i_w处,写回最大长度为i_r-i_w,然后用实际的写回长度提升i_w线程安全地A_+^*写回到j_w处,写回最大长度为j_w-j_r,然后用实际的写回长度降低j_w。写回的元素相应地从A_-^*A_+^*中移除。
  7. 重复2到5,直到i_r=j_r,且每个线程的A_-^*A_+^*为空。

由于每个线程所需的额外空间是与问题规模无关的变量,因此这个过程的空间复杂度为O(1)

简单来说,这个改进算法的核心思想在于“读了后的空间即可用于写”。当然,相对于简单的并行快速排序,它的调度麻烦了许多。下面是这个算法的C++代码:

	//快排获取中轴点
	template<class T>
	std::pair<intptr_t, intptr_t> QuickSortPartion(std::vector<T>& data, intptr_t left, intptr_t right) {
		//从count个样本中预测中值x
		intptr_t count = (right - left) * 0.001; if (count < 11) count = 11;
		intptr_t stride = (right - left) / count;
		std::vector<T> sample(count);
#pragma omp parallel for schedule(dynamic, 10000)
		for (intptr_t i = 0; i < count; i++) {
			sample[i] = data[left + stride * i];
		}
		Sort(sample);
		const auto x = sample[count / 2];

		//并行分界
		//时间复杂度:O(n)
		//空间复杂度:O(1)
		const intptr_t mSize = 1000;
		intptr_t iRead = left, jRead = right;
		intptr_t iWrite = left, jWrite = right;
		std::pair<intptr_t, intptr_t> retval;
#pragma omp parallel
		{
			//根据x分界
			std::vector<T> _data;
			std::vector<T> _dataSmaller;
			std::vector<T> _dataLarger;
			_data.reserve(mSize);
			_dataSmaller.reserve(mSize * 1.5);
			_dataLarger.reserve(mSize * 1.5);
			for (;;)
			{
				//第一步
				//线程安全地从data中取出一部分元素到_data中
				_data.clear();
#pragma omp critical
				{
#pragma omp flush(iRead)
#pragma omp flush(jRead)
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
					//最多分配mSize个元素
					const intptr_t size = jRead - iRead + 1 >= mSize ? mSize : jRead - iRead + 1;
					_data.resize(size);

					if (iRead - iWrite <= jWrite - jRead) {
						//从左侧iRead处开始分配size个元素
						std::copy(data.begin() + iRead, data.begin() + iRead + size, _data.begin());
						iRead += size;
					}
					else {
						//从右侧jRead处开始分配size个元素
						std::copy(data.begin() + jRead - size + 1, data.begin() + jRead + 1, _data.begin());
						jRead -= size;
					}
				}

				//第二步
				//在_data中与x比较,分为2组
				for (auto & val : _data) {
					if (val < x) {
						_dataSmaller.push_back(val);
					}
					else if (val > x) {
						_dataLarger.push_back(val);
					}
				}

				//第三步
				//线程安全地获取data中的写回下标,及各自的写回长度_iSize和_jSize
				intptr_t _iWrite = -1;
				intptr_t _jWrite = -1;
				intptr_t _iSize = 0;
				intptr_t _jSize = 0;
#pragma omp critical
				{
#pragma omp flush(iRead)
#pragma omp flush(jRead)
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
					bool isReadComplete = iRead > jRead;
					if (_dataSmaller.size() != 0) {
						_iWrite = iWrite;

						//当isReadComplete为假时,iWrite保证不超过iRead
						//当isReadComplete为真时,iWrite才可以超过iRead
						if (iWrite + _dataSmaller.size() <= iRead || isReadComplete) {
							//可以全部写入
							_iSize = _dataSmaller.size();
						}
						else {
							//只允许部分写入
							_iSize = iRead - iWrite;
						}

						//更新iWrite
						iWrite += _iSize;
					}
					if (_dataLarger.size() != 0) {
						_jWrite = jWrite;

						//当isReadComplete为假时,jWrite保证不超过jRead
						//当isReadComplete为真时,jWrite才可以超过jRead
						if (jWrite - _dataLarger.size() >= jRead || isReadComplete) {
							//可以全部写入
							_jSize = _dataLarger.size();
						}
						else {
							//只允许部分写入
							_jSize = jWrite - jRead;
						}

						//更新iWrite
						jWrite -= _jSize;
					}
				}

				//第四步
				//写回分组后的数据到data中
				if (_iWrite != -1) {
					//从左侧_iWrite处写回_dataSmaller中末端_iSize个元素
					std::copy(_dataSmaller.end() - _iSize, _dataSmaller.end(), data.begin() + _iWrite);
					_dataSmaller.resize(_dataSmaller.size() - _iSize);
				}
				if (_jWrite != -1) {
					//从右侧_jWrite处写回_dataLarger中末端_jSize个元素
					std::copy(_dataLarger.end() - _jSize, _dataLarger.end(), data.begin() + _jWrite - _jSize + 1);
					_dataLarger.resize(_dataLarger.size() - _jSize);
				}

				//最后
				//判断退出
				if (iRead > jRead) {
					if (_dataSmaller.size() == 0 && _dataLarger.size() == 0) break;
				}
			}

			//设置栅障
#pragma omp barrier

			//写回相等值
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
#pragma omp for schedule(dynamic, 10000)
			for (intptr_t i = iWrite; i <= jWrite; i++) {
				data[i] = x;
			}
#pragma omp single
			{
				retval.first = iWrite;
				retval.second = jWrite;
			}
		}

		return retval;
	}

	//并行快速排序
	template<class T>
	void QuickSort(std::vector<T>& data)
	{
		intptr_t threads = omp_get_max_threads();
		std::list<std::pair<intptr_t, intptr_t>> partions;
		partions.push_back(std::make_pair<intptr_t, intptr_t>(0, data.size() - 1));

		//分区
		for (int i = 0; i < threads * 4; i++) {
			std::pair<intptr_t, intptr_t> * largest = &partions.front();
			for (auto & p : partions) {
				if (p.second - p.first > largest->second - largest->first) largest = &p;
			}
			if (largest->second - largest->first < 10000) break;

			auto mid = QuickSortPartion(data, largest->first, largest->second);
			partions.push_back(std::make_pair(largest->first, mid.first - 1));
			partions.push_back(std::make_pair(mid.second + 1, largest->second));
			partions.remove(*largest);
		}

		//排序
		std::vector<std::pair<intptr_t, intptr_t>> partions2(partions.begin(), partions.end());
#pragma omp parallel for schedule(dynamic)
		for (intptr_t i = 0; i < partions2.size(); i++) {
			auto & p = partions2[i];
			std::sort(data.begin() + p.first, data.begin() + p.second + 1);
		}
	}

在C++实现中,我们只会根据工作线程数量调用有限次的QuickSortPartion方法。每次调用都会使1个子序列分为2个,只要分成了足够多,那么就可以并行地对每个子序列使用std::sort来完成排序。


性能比较

在4C8T的i7 6700K上,我们来比较上述并行归并排序、并行快速排序和单线程的std::sort的性能区别。排序数组为随机int。

元素个数 并行快速排序 并行归并排序 std::sort
10000000 93ms 188ms 484ms
100000000 1062ms 2343ms 4859ms

可以看到,并行快速排序有着非常可观的加速比。

 

称谓(*)
邮箱
留言(*)