大概今年年初的适合有一段时间比较清闲,在一串并行的实验代码中有一个排序的逻辑,虽然并不是时间复杂度的瓶颈所在,但是一直希望能有机会把排序改成并行的版本。于是就有了下面的两段程序。本文给出了两个C++并行排序的程序实例,一个基于归并排序思路,另一个基于快速排序思路。
并行归并排序
并行归并排序的核心是std::inplace_merge函数。这个函数简单来说是将[first, middle)和[middle, last)的两段有序序列合并为一段[first, last)的有序序列。相对于传统意义上的归并而言,它的空间复杂度为O(1),也就是函数名的直译“就地合并”的意思。并行归并排序的思路也很简单,主要分为以下2个步骤:
- 预排序。使用std::sort,将待排序数组预排序为N个长度为k的子数组加上一个长度为b的尾巴子数组(0<b<k)。这里k可以取一个较大的粒度,如2048。预排序完成后,原数组从0开始每长为k的序列和小尾巴都是有序序列。因为不同子数组之间没有数据冲突,因此这个过程可以并行完成。
- 归并。将两两相邻的子数组使用std::inplace_merge归并。重复直到完成整个输入数组的归并。因为两两相邻的子数组之间也没有数据冲突,因此这个过程也可以并行完成。
这个算法的优点在于稳定性,它的核心都采用STL库完成,本身只起一个调度的作用,因此对于不同的输入数组都能有较好的表现。而缺点在于并行度上,由其特性可知,最后一次归并必然只有1个线程工作,倒数第二次归并必然只有2个线程工作,以此类推;而越到后面的归并长度又越长。因此,这个算法对多核的利用率一般。
算法的C++代码如下:
template<class T>
void Sort(std::vector<T> & data)
{
const intptr_t size = data.size();
intptr_t stride = 2048;
//从stride = 1开始归并排序非常缓慢
//因此这里对data进行初排序
//从一个较大的stride开始归并排序
if (stride != 1) {
#pragma omp parallel for schedule(dynamic, 2048 / stride + 1)
for (intptr_t i = 0; i < size; i += stride) {
auto left = data.begin() + i;
auto right = i + stride < size ? data.begin() + i + stride : data.end();
std::sort(left, right);
}
}
//并行归并排序
#pragma omp parallel
{
intptr_t _stride = stride;
do
{
_stride *= 2;
#pragma omp for schedule(dynamic, 2048 / _stride + 1)
for (intptr_t i = 0; i < size; i += _stride) {
//对[i, i+_stride/2)和[i+_stride/2, i+_stride)进行归并
auto left = data.begin() + i;
auto mid = (i + i + _stride) / 2 < size ? data.begin() + (i + i + _stride) / 2 : data.end();
auto right = i + _stride < size ? data.begin() + i + _stride : data.end();
inplace_merge(left, mid, right);
}
} while (_stride < size);
}
}
并行快速排序
快速排序的核心无非两个步骤:选取锚点、移动元素。并行k快速排序也是如此。
一个简单的并行快速排序中:
- 假设某一次对数组排序。
- 找出锚点。
- 并行地遍历,然后把大于和小于的元素分别线程安全地插入到一个新数组的两边的,即整理得到。
这种算法的优缺点分别和并行归并排序相反。由于依赖于锚点算法,因此稳定性不能保证;但整个排序过程中,每个线程基本上都不会浪费,因此并行度更佳。
上述的并行快速排序虽然逻辑简单,但是却将排序本身所需的额外空间从提升到了。这里介绍一种改进后的空间复杂度依然为的并行快速排序,是我某日头脑风暴得到的,不知道之前有没有人提出过。这种方法的思路类似于inplace_merge进行原地整理:
- 假设某一次对数组排序,我们维护四个地址,并将初始为的首元素地址,初始为的末元素地址。语义上讲,和分别为读入元素的下标,和分别为写回元素的下标。说到这里有的哥们应该就已经差不多get到要点了。
- 每个线程分配一个长度为的线程私有数组,以及私有的空数组和。
- 找出锚点。
- 每个线程每次线程安全地从的(或)处复制个元素到,然后将提升(或将降低)。若小于,则优先从复制;若大于,则优先从复制;若等于,则随机。
- 每个线程整理,小于的元素并入,大于的元素并入。
- 线程安全地将写回到处,写回最大长度为,然后用实际的写回长度提升;线程安全地将写回到处,写回最大长度为,然后用实际的写回长度降低。写回的元素相应地从和中移除。
- 重复2到5,直到,且每个线程的和为空。
由于每个线程所需的额外空间是与问题规模无关的变量,因此这个过程的空间复杂度为。
简单来说,这个改进算法的核心思想在于“读了后的空间即可用于写”。当然,相对于简单的并行快速排序,它的调度麻烦了许多。下面是这个算法的C++代码:
//快排获取中轴点
template<class T>
std::pair<intptr_t, intptr_t> QuickSortPartion(std::vector<T>& data, intptr_t left, intptr_t right) {
//从count个样本中预测中值x
intptr_t count = (right - left) * 0.001; if (count < 11) count = 11;
intptr_t stride = (right - left) / count;
std::vector<T> sample(count);
#pragma omp parallel for schedule(dynamic, 10000)
for (intptr_t i = 0; i < count; i++) {
sample[i] = data[left + stride * i];
}
Sort(sample);
const auto x = sample[count / 2];
//并行分界
//时间复杂度:O(n)
//空间复杂度:O(1)
const intptr_t mSize = 1000;
intptr_t iRead = left, jRead = right;
intptr_t iWrite = left, jWrite = right;
std::pair<intptr_t, intptr_t> retval;
#pragma omp parallel
{
//根据x分界
std::vector<T> _data;
std::vector<T> _dataSmaller;
std::vector<T> _dataLarger;
_data.reserve(mSize);
_dataSmaller.reserve(mSize * 1.5);
_dataLarger.reserve(mSize * 1.5);
for (;;)
{
//第一步
//线程安全地从data中取出一部分元素到_data中
_data.clear();
#pragma omp critical
{
#pragma omp flush(iRead)
#pragma omp flush(jRead)
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
//最多分配mSize个元素
const intptr_t size = jRead - iRead + 1 >= mSize ? mSize : jRead - iRead + 1;
_data.resize(size);
if (iRead - iWrite <= jWrite - jRead) {
//从左侧iRead处开始分配size个元素
std::copy(data.begin() + iRead, data.begin() + iRead + size, _data.begin());
iRead += size;
}
else {
//从右侧jRead处开始分配size个元素
std::copy(data.begin() + jRead - size + 1, data.begin() + jRead + 1, _data.begin());
jRead -= size;
}
}
//第二步
//在_data中与x比较,分为2组
for (auto & val : _data) {
if (val < x) {
_dataSmaller.push_back(val);
}
else if (val > x) {
_dataLarger.push_back(val);
}
}
//第三步
//线程安全地获取data中的写回下标,及各自的写回长度_iSize和_jSize
intptr_t _iWrite = -1;
intptr_t _jWrite = -1;
intptr_t _iSize = 0;
intptr_t _jSize = 0;
#pragma omp critical
{
#pragma omp flush(iRead)
#pragma omp flush(jRead)
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
bool isReadComplete = iRead > jRead;
if (_dataSmaller.size() != 0) {
_iWrite = iWrite;
//当isReadComplete为假时,iWrite保证不超过iRead
//当isReadComplete为真时,iWrite才可以超过iRead
if (iWrite + _dataSmaller.size() <= iRead || isReadComplete) {
//可以全部写入
_iSize = _dataSmaller.size();
}
else {
//只允许部分写入
_iSize = iRead - iWrite;
}
//更新iWrite
iWrite += _iSize;
}
if (_dataLarger.size() != 0) {
_jWrite = jWrite;
//当isReadComplete为假时,jWrite保证不超过jRead
//当isReadComplete为真时,jWrite才可以超过jRead
if (jWrite - _dataLarger.size() >= jRead || isReadComplete) {
//可以全部写入
_jSize = _dataLarger.size();
}
else {
//只允许部分写入
_jSize = jWrite - jRead;
}
//更新iWrite
jWrite -= _jSize;
}
}
//第四步
//写回分组后的数据到data中
if (_iWrite != -1) {
//从左侧_iWrite处写回_dataSmaller中末端_iSize个元素
std::copy(_dataSmaller.end() - _iSize, _dataSmaller.end(), data.begin() + _iWrite);
_dataSmaller.resize(_dataSmaller.size() - _iSize);
}
if (_jWrite != -1) {
//从右侧_jWrite处写回_dataLarger中末端_jSize个元素
std::copy(_dataLarger.end() - _jSize, _dataLarger.end(), data.begin() + _jWrite - _jSize + 1);
_dataLarger.resize(_dataLarger.size() - _jSize);
}
//最后
//判断退出
if (iRead > jRead) {
if (_dataSmaller.size() == 0 && _dataLarger.size() == 0) break;
}
}
//设置栅障
#pragma omp barrier
//写回相等值
#pragma omp flush(iWrite)
#pragma omp flush(jWrite)
#pragma omp for schedule(dynamic, 10000)
for (intptr_t i = iWrite; i <= jWrite; i++) {
data[i] = x;
}
#pragma omp single
{
retval.first = iWrite;
retval.second = jWrite;
}
}
return retval;
}
//并行快速排序
template<class T>
void QuickSort(std::vector<T>& data)
{
intptr_t threads = omp_get_max_threads();
std::list<std::pair<intptr_t, intptr_t>> partions;
partions.push_back(std::make_pair<intptr_t, intptr_t>(0, data.size() - 1));
//分区
for (int i = 0; i < threads * 4; i++) {
std::pair<intptr_t, intptr_t> * largest = &partions.front();
for (auto & p : partions) {
if (p.second - p.first > largest->second - largest->first) largest = &p;
}
if (largest->second - largest->first < 10000) break;
auto mid = QuickSortPartion(data, largest->first, largest->second);
partions.push_back(std::make_pair(largest->first, mid.first - 1));
partions.push_back(std::make_pair(mid.second + 1, largest->second));
partions.remove(*largest);
}
//排序
std::vector<std::pair<intptr_t, intptr_t>> partions2(partions.begin(), partions.end());
#pragma omp parallel for schedule(dynamic)
for (intptr_t i = 0; i < partions2.size(); i++) {
auto & p = partions2[i];
std::sort(data.begin() + p.first, data.begin() + p.second + 1);
}
}
在C++实现中,我们只会根据工作线程数量调用有限次的QuickSortPartion方法。每次调用都会使1个子序列分为2个,只要分成了足够多,那么就可以并行地对每个子序列使用std::sort来完成排序。
性能比较
在4C8T的i7 6700K上,我们来比较上述并行归并排序、并行快速排序和单线程的std::sort的性能区别。排序数组为随机int。
元素个数 | 并行快速排序 | 并行归并排序 | std::sort |
10000000 | 93ms | 188ms | 484ms |
100000000 | 1062ms | 2343ms | 4859ms |
可以看到,并行快速排序有着非常可观的加速比。