Java自学者论坛

 找回密码
 立即注册

手机号码,快捷登录

恭喜Java自学者论坛(https://www.javazxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,会员资料板块,购买链接:点击进入购买VIP会员

JAVA高级面试进阶训练营视频教程

Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程Go语言视频零基础入门到精通Java架构师3期(课件+源码)
Java开发全终端实战租房项目视频教程SpringBoot2.X入门到高级使用教程大数据培训第六期全套视频教程深度学习(CNN RNN GAN)算法原理Java亿级流量电商系统视频教程
互联网架构师视频教程年薪50万Spark2.0从入门到精通年薪50万!人工智能学习路线教程年薪50万大数据入门到精通学习路线年薪50万机器学习入门到精通教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程MySQL入门到精通教程
查看: 5653|回复: 0

多线程外排序解决大数据排序问题1(并行快排和并行归并)【转】

[复制链接]
  • TA的每日心情
    奋斗
    2024-4-6 11:05
  • 签到天数: 748 天

    [LV.9]以坛为家II

    2034

    主题

    2092

    帖子

    70万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    705612
    发表于 2021-4-19 12:04:33 | 显示全部楼层 |阅读模式

    转自AIfred

    问题:

    对一个 10GB 的数据文件排序,而计算机内存仅有 4GB 

    思路:

    将整个文件读入内存排序显然不行。可以将这个 10GB 的大文件分区为 100 个 100MB 的小文件,把这些小文件的数据依次读入内存、排序、再输出,于是我们便得到了 100 个各自有序的小文件。接下来再将这 100 个小文件两两归并,便得到了一个有序的大文件,完成了排序操作。在实际中,如果仅仅使用普通串行算法实现,整个程序的效率非常低。尽可能多的并行化会大幅提升效率。

    用到知识点:

    1. 多线程读写二进制文件

    2. 并行归并排序(sort部分并行,merge部分并行)

    3. 解决线程冲突(C++11的原子操作)

    分区排序

    对待排序的大文件,首先需要定义每个分区最多的元素数量partition_size。例如待排序文件中有5500个元素,设partition_size = 1000, 需要创建6个分区,数量分别为5*1000,1*500。然后分别对这个6个分区单独内排序即可。

    4-11:对第i个分区,首先将待排序文件中对应该分区部分的元素并行读入内存

    26-31:对第i个分区,将排序好的对应分区的元素并行写入文件

    由于不能将整个文件全部读入内存,只能通过fsetpos和 fread函数来读取该位置的元素的值。

     1 void partition_and_sort(string in_file, long long n) {
     2     int* arr = new int[PARTITION_SIZE]; // 一个分区大小的临时数组
     3     // 分区循环
     4     for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) { 
     5         // each_get: 分区内一个线程处理大小, all_get: 分区大小
     6         int each_get[4] = {}, all_get = 0;
     7         clock_t start_time = clock();
     8         // 多线程读取一个分区
     9         parallel_for(0, MAX_THREADS, [&](long long x) {
    10             FILE* fin = fopen(in_file.c_str(), "rb");
    11             // 在文件中定位到该线程读取的位置
    12             fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
    13             if (fsetpos(fin, &pos) == 0)
    14                 // 读取 EACH_NUM个值,返回个数
    15                 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
    16             fclose(fin);
    17         });
    18         clock_t end_time = clock();
    19         for (int i = 0; i < MAX_THREADS; i++) all_get += each_get;
    20         cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
    21             << "Time usage = " << end_time - start_time << "ms.\n";
    22         start_time = clock();
    23         parallel_qsort(arr, arr + all_get); // 并行快速排序
    24         end_time = clock();
    25         cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";
    26         string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
    27         FILE *fout = fopen(tmp_file.c_str(), "wb");
    28         chsize(fileno(fout), all_get * sizeof(int));
    29         fclose(fout);
    30         start_time = clock();
    31         // 对一个分区并行写入文件
    32         parallel_for(0, MAX_THREADS, [&](long long x) {
    33             FILE* fout = fopen(tmp_file.c_str(), "rb+");
    34             fpos_t pos = EACH_NUM * x * sizeof(int);
    35             if (fsetpos(fout, &pos) == 0)
    36                 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
    37             fclose(fout);
    38         });
    39         end_time = clock();
    40         cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
    41             << "Time usage = " << end_time - start_time << "ms.\n";
    42     }
    43     delete[] arr;
    44 }

    并行快排:算法导论版快排,一个for循环维护 i , j 区间,[0, i]其值<=key,(i, j)其值>key。

     1 void parallel_qsort(int *begin, int *end) {
     2     if (begin >= end - 1) return;
     3     int *key = rand() % (end - begin) + begin; // 选 pivot
     4     swap(*key, *begin);
     5     int *i = begin, *j = begin;
     6     for (key = begin; j < end; j++) {
     7         if (*j < *key) {
     8             i++;
     9             swap(*i, *j);
    10         }
    11     }
    12     swap(*begin, *i);
    13     if (i - begin > dx && end - i > dx) { // 如果两个区间长度均大于dx,并行
    14         parallel_for(0, 2, [&](int x) {
    15             if (x) parallel_qsort(begin, i);
    16             else parallel_qsort(i + 1, end);
    17         });
    18     } else { // 区间长度较小则不并行
    19         parallel_qsort(begin, i);
    20         parallel_qsort(i + 1, end);
    21     }
    22 }

    归并:

    是外排序核心,整个程序的效率高低取决于该阶段的算法。由于待归并的数据规模较大,使用并行归并算法可以显著提升效率。

    并行一般需要分治的思想,需要先找到各对枢轴,将原数据划分成均匀的几部分,再启动多个线程进行归并。

    段组分解的基本思想

    下面就来讲解一种最简单的基于二分查找的段组分解算法的基本思想。
    先看一个实例,有以下两组有序数据,如何将它们进行并行归并呢?

    • 第一组:15 25 33 47 58 59 62 64
    • 第二组:12 18 27 31 36 38 42 80

    考虑将第1组数据以中间位置数据47作为枢轴数据平分成两段,如下所示:

    • 第一组:(15 25 33 47)(58 59 62 64)

    然后再在第2组数据中使用对半查找方法查找一个刚好小于等于枢轴数据47的数据42,以42为枢轴将第2组数据分成两段如下:

    • 第二组:(12 18 27 31 36 38 42)(80)

    此时可以发现,第一组的第1段数据和第二组的第1段数据均小于第1组的第2段或第二组的第2段数据。
    将第一组的第1段数据和第二组的第1段数据进行归并,得到以下数据序列:

    • 第一段归并:(12 15 18 25 27 31 33 36 38 42 47)

    将第一组的第2段数据和第二组的第2段数据进行归并,

    • 第二段归并:(58 59 62 64 80)

    很容易发现,上面两段归并后的数据连起来后,自然就形成了一个有序系列。由于上面两次归并操作都是独立的,因此它可以并行地进行。这种可以归并的两个成对数据段就是前面说过的段组。

    47在第一组数据中的位置是342在第二组数据中的位置是6。位置对(3,6)将上面两组数据分成了两个可以并行归并的段组。

    从上面的例子可以看出,在第一组数据的枢轴数据47确定以后,在第二组数据中使用折半查找方法找到刚好小于等于枢轴数据47的数据42,时间复杂度为O(logn)O(logn)。为了让效率最优化,每个线程的负载应该尽可能均衡,这就要求选取的位置对能够将原数据分成元素数量相对均衡的段组。于是我们也应使用折半查找确定第一组数据的枢轴的合适位置。

    嵌套二分求枢轴:时间复杂度O((logn)^2)

    下面程序假设为4个线程,找3个枢轴将两个要合并的数组分别分成4段,每一对片段合并后的长度一致,使负载尽可能均衡。

    由于不能一次全部将整个文件装进内存,此处涉及磁盘操作,但由于折半查找的效率较高,访问磁盘的次数并不多,开销相对较小。

     1 void get_fpos() {
     2     long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
     3     for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
     4         long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
     5         int *get1 = new int;
     6         while (r1 - l1 > 1) {
     7             pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
     8             fpos = pos1 * sizeof(int);
     9             fsetpos(fin1, &fpos);
    10             fread(get1, sizeof(int), 1, fin1);
    11             long long l2 = 0, r2 = n2;
    12             int* get2 = new int;
    13             while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
    14                 pos2 = (l2 + r2) / 2;
    15                 fpos = pos2 * sizeof(int);
    16                 fsetpos(fin2, &fpos);
    17                 fread(get2, sizeof(int), 1, fin2);
    18                 if (*get1 <= *get2)
    19                     r2 = pos2;
    20                 else
    21                     l2 = pos2 + 1;
    22             }
    23             delete get2; get2 = NULL;
    24             pos2 = r2;
    25             //如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
    26             if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
    27                 l1 = pos1 + 1;
    28             else
    29                 r1 = pos1 - 1;
    30         }
    31         delete get1; get1 = NULL;
    32         seek_pos[1] = pos1 * sizeof(int);//记录file1枢轴位置
    33         seek_pos[2] = pos2 * sizeof(int);//记录file2枢轴位置
    34         seek_pos[0] = seek_pos[1] + seek_pos[2];//输出文件枢轴位置
    35     }
    36     fclose(fin1); fclose(fin2);
    37     //边界细节
    38     seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
    39     seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
    40     seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
    41     seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
    42 }

    完成归并操作的merge_file函数需要传入两个string类型的参数,表示需要归并的两组数据的文件名,返回归并后的数据的文件名:

     string merge_file(string in_file1, string in_file2) {}

    在这个函数中,我们首先需要获取到两个输入文件的大小(从而得到元素数量):

    1 void get_size() {
    2     FILE* fin1 = fopen(in_file1.c_str(), "rb");
    3     FILE* fin2 = fopen(in_file2.c_str(), "rb");
    4     fseek(fin1, 0, SEEK_END); // 将文件指针设为文末
    5     fseek(fin2, 0, SEEK_END);
    6     fpos_t size1 = 0, size2 = 0;
    7     fgetpos(fin1, &size1); // 获得文件大小
    8     fgetpos(fin2, &size2);
    9 }

    于是size1 / sizeof(int)便是第一个文件中的元素数量,size2 / sizeof(int)便是第二个文件中的元素数量。

    下面创建输出文件,并预先调整其大小(为了并行写入):

    1 void create_output_file() {
    2     string out_file = "temp\\part" + to_string(parts++) + ".dat";
    3     FILE *fout = fopen(out_file.c_str(), "wb");
    4     _chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
    5 }

    parts是一个全局变量,用来统计已经创建的文件数量并作为新文件的文件名。(比如前一阶段 partition操作将原数据分割为5part,分别命名为part0.datpart1.dat… part4.dat, 那么在归并阶段形成的新文件将继续命名为 part5.datpart6.dat … 比如 part0.datpart1.dat归并得到part5.dat, part3.dat和 part4.dat归并得到part6.dat…)由于此时可能有多个线程在执行merge_file函数,同时访问全局变量可能会发生访问冲突,故parts必须定义为atomic_int类型,实现原子操作。

    归并操作:

     1 //多线程归并操作,每个线程只负责归并对应的段组
     2 parallel_for(0, MAX_THREADS, [&](int x) {
     3     FILE* fin1 = fopen(in_file1.c_str(), "rb");
     4     FILE* fin2 = fopen(in_file2.c_str(), "rb");
     5     FILE* fout = fopen(out_file.c_str(), "rb+");
     6     //根据枢轴位置确定读入起点
     7     fsetpos(fin1, &seek_pos[x][1]);
     8     fsetpos(fin2, &seek_pos[x][2]);
     9     fsetpos(fout, &seek_pos[x][0]);
    10     //输入输出缓冲,优化读写性能
    11     int *buf0 = new int[BUFFER_SIZE];//输出文件缓冲
    12     int *buf1 = new int[BUFFER_SIZE];//fin1缓冲
    13     int *buf2 = new int[BUFFER_SIZE];//fin2缓冲
    14     int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
    15     //该线程应该从file1中读取all1个数据,从file2中读取all2个数据
    16     long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
    17     long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
    18     //先读取到缓冲区
    19     fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    20     fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    21     while (all1 > 0 && all2 > 0) {//归并排序
    22         if (buf1 < buf2[j]) {
    23             buf0[k++] = buf1[i++]; all1--;
    24             if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
    25                 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    26                 i = 0;
    27             }
    28         } else {
    29             buf0[k++] = buf2[j++]; all2--;
    30             if (j == BUFFER_SIZE) {
    31                 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    32                 j = 0;
    33             }
    34         }
    35         if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
    36             fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    37             k = 0;          
    38         }
    39     }
    40     while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
    41         buf0[k++] = buf1[i++]; all1--;
    42         if (i == BUFFER_SIZE) {
    43             fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    44             i = 0;
    45         }
    46         if (k == BUFFER_SIZE) {
    47             fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    48             k = 0;
    49         }
    50     }
    51     while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
    52         buf0[k++] = buf2[j++]; all2--;
    53         if (j == BUFFER_SIZE) {
    54             fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    55             j = 0;
    56         }
    57         if (k == BUFFER_SIZE) {
    58             fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    59             k = 0;
    60         }
    61     }
    62     fwrite(buf0, sizeof(int), k, fout);//写入输出文件
    63     fclose(fin1);
    64     fclose(fin2);
    65     fclose(fout);
    66     delete[] buf0;
    67     delete[] buf1;
    68     delete[] buf2;
    69 });

    在归并操作中,一个非常重要的优化便是输入输出缓冲区。输入缓冲即预先从文件一次性读入BUFFER_SIZE个数据,之后频繁的读取操作便从缓冲区获取,当缓冲区读完时(即ij指向了 buf1buf2的尾端),再一次性从文件中读取 BUFFER_SIZE个数据,重置ij ;输出缓冲即将数据先写入缓冲区,待缓冲区填满后(即k指向buf0的尾端)再一次性写入文件。缓冲区的设立,避免了频繁的IO操作,配合多线程可以使读写磁盘速率达到最大,性能大幅提升。

    merge_file函数编写完毕,我们需要递归调用它,直到所有的小分区都两两归并完成。递归的过程可以并行处理(不过由于merge_file函数本身已做并行化处理,此处串行递归亦可,效率不会太差):

     1 string merge(int l, int r) {
     2     if (l == r) return "temp\\part" + to_string(l) + ".dat";
     3     int mid = (l + r) / 2;
     4     string file1, file2;
     5     parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
     6         if (x == 0) file1 = merge(l, mid);
     7         if (x == 1) file2 = merge(mid + 1, r);
     8     });
     9     return merge_file(file1, file2);
    10 }

    整合:

      1 #include <iostream>
      2 #include <stdio.h>
      3 #include <cstring>
      4 #include <string>
      5 #include <atomic>
      6 #include <Windows.h>
      7 #include <ppl.h>
      8 #include <io.h>
      9 #include <time.h>
     10 #define MAX_THREADS 4//最多线程数量
     11 using namespace std;
     12 using namespace concurrency;
     13 const int dx = 20;//并行快速排序的dx优化
     14 const long long PARTITION_SIZE = 100000000;//分区大小
     15 const long long BUFFER_SIZE = 10000000;//输入输出缓冲区大小
     16 const long long EACH_NUM = (PARTITION_SIZE / MAX_THREADS);
     17 atomic_int parts;
     18 void parallel_qsort(int *begin, int *end) {//并行快速排序
     19     if (begin >= end - 1) return;
     20     int *key = rand() % (end - begin) + begin;
     21     swap(*key, *begin);
     22     int *i = begin, *j = begin;
     23     for (key = begin; j < end; j++) {
     24         if (*j < *key) {
     25             i++;
     26             swap(*i, *j);
     27         }
     28     }
     29     swap(*begin, *i);
     30     if (i - begin > dx && end - i > dx) {//dx优化
     31         parallel_for(0, 2, [&](int x) {
     32             if (x) parallel_qsort(begin, i);
     33             else parallel_qsort(i + 1, end);
     34         });
     35     } else {
     36         parallel_qsort(begin, i);
     37         parallel_qsort(i + 1, end);
     38     }
     39 }
     40 void partition_and_sort(string in_file, long long n) {
     41     int* arr = new int[PARTITION_SIZE];
     42     for (long long i = 0; i < (n - 1) / PARTITION_SIZE + 1; i++) {//准备建立第i个分区
     43         int each_get[4] = {}, all_get = 0;
     44         //并行读取int数据至arr[]中,每个线程计划读取EACH_NUM个,一共计划读取PARTITION_SIZE个
     45         //每个线程实际读取each_get[x]个,实际一共读取all_get个
     46         clock_t start_time = clock();
     47         parallel_for(0, MAX_THREADS, [&](long long x) {
     48             FILE* fin = fopen(in_file.c_str(), "rb");
     49             fpos_t pos = (PARTITION_SIZE * i + EACH_NUM * x) * sizeof(int);
     50             if (fsetpos(fin, &pos) == 0)
     51                 each_get[x] = fread(arr + EACH_NUM * x, sizeof(int), EACH_NUM, fin);
     52             fclose(fin);
     53         });
     54         clock_t end_time = clock();
     55         for (int i = 0; i < MAX_THREADS; i++) all_get += each_get;
     56         cout << "\nRead " << all_get << " numbers from \"" << in_file << "\". "
     57             << "Time usage = " << end_time - start_time << "ms.\n";
     58         //对arr进行并行快速排序
     59         start_time = clock();
     60         parallel_qsort(arr, arr + all_get);
     61         end_time = clock();
     62         cout << "Sorting finished. Time usage = " << end_time - start_time << "ms.\n";
     63         //创建分区文件并调整大小
     64         string tmp_file = "temp\\part" + to_string(parts++) + ".dat";
     65         FILE *fout = fopen(tmp_file.c_str(), "wb");
     66         chsize(fileno(fout), all_get * sizeof(int));
     67         fclose(fout);
     68         //并行将arr全部写入分区文件
     69         start_time = clock();
     70         parallel_for(0, MAX_THREADS, [&](long long x) {
     71             FILE* fout = fopen(tmp_file.c_str(), "rb+");
     72             fpos_t pos = EACH_NUM * x * sizeof(int);
     73             if (fsetpos(fout, &pos) == 0)
     74                 fwrite(arr + EACH_NUM * x, sizeof(int), each_get[x], fout);
     75             fclose(fout);
     76         });
     77         end_time = clock();
     78         cout << "Part " << parts - 1 << " saved to file \"" << tmp_file << "\". "
     79             << "Time usage = " << end_time - start_time << "ms.\n";
     80     }
     81     delete[] arr;
     82 }
     83 string merge_file(string in_file1, string in_file2) {//将两个文件归并
     84     string out_file = "temp\\part" + to_string(parts++) + ".dat";
     85     //首先获取两个文件的长度
     86     FILE* fin1 = fopen(in_file1.c_str(), "rb");
     87     FILE* fin2 = fopen(in_file2.c_str(), "rb");
     88     clock_t seekpos_start_time = clock();
     89     fseek(fin1, 0, SEEK_END);
     90     fseek(fin2, 0, SEEK_END);
     91     fpos_t size1 = 0, size2 = 0;
     92     fgetpos(fin1, &size1);
     93     fgetpos(fin2, &size2);
     94     long long seek_pos[MAX_THREADS + 1][3] = {}, n1 = size1 / 4, n2 = size2 / 4;
     95     for (long long i = 1; i < MAX_THREADS; i++) {//找第i个枢轴
     96         long long pos1, pos2, fpos, l1 = 0, r1 = n1 - 1;
     97         int *get1 = new int;
     98         while (r1 - l1 > 1) {
     99             pos1 = (l1 + r1) / 2;//二分查找,先假定file1的枢轴
    100             fpos = pos1 * sizeof(int);
    101             fsetpos(fin1, &fpos);
    102             fread(get1, sizeof(int), 1, fin1);
    103             long long l2 = 0, r2 = n2;
    104             int* get2 = new int;
    105             while (r2 - l2 > 0) {//再用二分查找确定file2的枢轴
    106                 pos2 = (l2 + r2) / 2;
    107                 fpos = pos2 * sizeof(int);
    108                 fsetpos(fin2, &fpos);
    109                 fread(get2, sizeof(int), 1, fin2);
    110                 if (*get1 <= *get2)
    111                     r2 = pos2;
    112                 else
    113                     l2 = pos2 + 1;
    114             }
    115             delete get2; get2 = NULL;
    116             pos2 = r2;
    117             //如果这两个枢轴将file1和file2划分的不够均匀,则对pos1进行调整
    118             if ((pos1 + pos2) * long long(MAX_THREADS) < (n1 + n2) * i)
    119                 l1 = pos1 + 1;
    120             else
    121                 r1 = pos1 - 1;
    122         }
    123         delete get1; get1 = NULL;
    124         seek_pos[1] = pos1 * sizeof(int);//记录file1枢轴位置
    125         seek_pos[2] = pos2 * sizeof(int);//记录file2枢轴位置
    126         seek_pos[0] = seek_pos[1] + seek_pos[2];//输出文件枢轴位置
    127     }
    128     fclose(fin1); fclose(fin2);
    129     //边界细节
    130     seek_pos[0][0] = seek_pos[0][1] = seek_pos[0][2] = 0;
    131     seek_pos[MAX_THREADS][1] = n1 * sizeof(int);
    132     seek_pos[MAX_THREADS][2] = n2 * sizeof(int);
    133     seek_pos[MAX_THREADS][0] = seek_pos[MAX_THREADS][1] + seek_pos[MAX_THREADS][2];
    134     clock_t seekpos_end_time = clock();
    135     //调整输出文件大小
    136     clock_t chsize_start_time = clock();
    137     FILE *fout = fopen(out_file.c_str(), "wb");
    138     _chsize_s(fileno(fout), (n1 + n2) * sizeof(int));
    139     fclose(fout);
    140     clock_t chsize_end_time = clock();
    141     //多线程归并操作,每个线程只负责归并对应的段组
    142     clock_t merge_start_time = clock();
    143     parallel_for(0, MAX_THREADS, [&](int x) {
    144         FILE* fin1 = fopen(in_file1.c_str(), "rb");
    145         FILE* fin2 = fopen(in_file2.c_str(), "rb");
    146         FILE* fout = fopen(out_file.c_str(), "rb+");
    147         //根据枢轴位置确定读入起点
    148         fsetpos(fin1, &seek_pos[x][1]);
    149         fsetpos(fin2, &seek_pos[x][2]);
    150         fsetpos(fout, &seek_pos[x][0]);
    151         //输入输出缓冲,优化读写性能
    152         int *buf0 = new int[BUFFER_SIZE];
    153         int *buf1 = new int[BUFFER_SIZE];
    154         int *buf2 = new int[BUFFER_SIZE];
    155         int i = 0, j = 0, k = 0;//i, j, k分别是buf1,buf2,buf0的数组下标
    156         //该线程应该从file1中读取all1个数据,从file2中读取all2个数据
    157         long long all1 = (seek_pos[x + 1][1] - seek_pos[x][1]) / sizeof(int);
    158         long long all2 = (seek_pos[x + 1][2] - seek_pos[x][2]) / sizeof(int);
    159         //先读取到缓冲区
    160         fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    161         fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    162         while (all1 > 0 && all2 > 0) {//归并排序
    163             if (buf1 < buf2[j]) {
    164                 buf0[k++] = buf1[i++]; all1--;
    165                 if (i == BUFFER_SIZE) {//如果缓冲区读完了,就更新缓冲区,重置i
    166                     fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    167                     i = 0;
    168                 }
    169             } else {
    170                 buf0[k++] = buf2[j++]; all2--;
    171                 if (j == BUFFER_SIZE) {
    172                     fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    173                     j = 0;
    174                 }
    175             }
    176             if (k == BUFFER_SIZE) {//如果缓冲区写满了,就全部写入文件,重置k
    177                 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    178                 k = 0;
    179             }
    180         }
    181         while (all1 > 0) {//归并流程-如果file1中还有剩余数据,直接追加输出
    182             buf0[k++] = buf1[i++]; all1--;
    183             if (i == BUFFER_SIZE) {
    184                 fread(buf1, sizeof(int), BUFFER_SIZE, fin1);
    185                 i = 0;
    186             }
    187             if (k == BUFFER_SIZE) {
    188                 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    189                 k = 0;
    190             }
    191         }
    192         while (all2 > 0) {//归并流程-如果file2中还有剩余数据,直接追加输出
    193             buf0[k++] = buf2[j++]; all2--;
    194             if (j == BUFFER_SIZE) {
    195                 fread(buf2, sizeof(int), BUFFER_SIZE, fin2);
    196                 j = 0;
    197             }
    198             if (k == BUFFER_SIZE) {
    199                 fwrite(buf0, sizeof(int), BUFFER_SIZE, fout);
    200                 k = 0;
    201             }
    202         }
    203         fwrite(buf0, sizeof(int), k, fout);//写入输出文件
    204         fclose(fin1);
    205         fclose(fin2);
    206         fclose(fout);
    207         delete[] buf0;
    208         delete[] buf1;
    209         delete[] buf2;
    210     });
    211     clock_t merge_end_time = clock();
    212     Sleep(100);
    213     //输入文件的数据已归并至新文件中,不再需要,删除。
    214     system(("del " + in_file1).c_str());
    215     system(("del " + in_file2).c_str());
    216     cout << "\nPart \"" << in_file1 << "\" and \"" << in_file2 << "\" merged, "
    217         << "result saved to \"" << out_file << "\".\n"
    218         << "Time usage: seek_pos: " << seekpos_end_time - seekpos_start_time << "ms, "
    219         << "chsize: " << chsize_end_time - chsize_start_time << "ms, "
    220         << "parallel_merge: " << merge_end_time - merge_start_time << "ms.\n";
    221     return out_file;
    222 }
    223 string merge(int l, int r) {//递归归并操作
    224     if (l == r) return "temp\\part" + to_string(l) + ".dat";
    225     int mid = (l + r) / 2;
    226     string file1, file2;
    227     parallel_for(0, 2, [&](int x) {//此处使用串行递归亦可,不过效率略低一点
    228         if (x == 0) file1 = merge(l, mid);
    229         if (x == 1) file2 = merge(mid + 1, r);
    230     });
    231     return merge_file(file1, file2);
    232 }
    233 int main() {
    234     string in_file, out_file;
    235     cout << "Enter data file name: ";
    236     cin >> in_file;
    237     FILE* fin = fopen(in_file.c_str(), "rb");
    238     if (fin == NULL) {
    239         cout << "Could not open that file.\n";
    240         main();
    241     }
    242     clock_t start_time = clock();
    243     //获取输入文件的大小
    244     fseek(fin, 0, SEEK_END);
    245     fpos_t pos = 0;
    246     fgetpos(fin, &pos);
    247     fclose(fin);
    248     //创建临时文件目录
    249     system("mkdir temp");
    250     //将待排序大文件分区,并对各个小分区进行快速排序
    251     partition_and_sort(in_file, pos / 4); // pos是字节数,所以要/4统计int个数
    252     cout << "\nStart merging...\n";
    253     //将各个小分区归并
    254     out_file = merge(0, parts - 1);
    255     system(("move " + out_file + " ans.dat").c_str());
    256     system("rd temp");
    257     clock_t end_time = clock();
    258     cout << "External sorting complete, result saved to \"ans.dat\".\n"
    259         << "Time usage = " << end_time - start_time << "ms.\n";
    260     system("pause");
    261     return 0;
    262 }

    windows下排序 1e10 个int数据( 37.2GB ),耗时约 30min,经过验证,结果正确。

    总结

    外排序的算法不难,单线程实现很简单:分区阶段便是读取文本文件、排序、再写到文本文件中;归并阶段也是简单地用单线程读写文本文件,效率极低。后来,多线程读写二进制文件,快速排序和归并排序也实现了高效的并行,整个程序几乎全程并行处理,效率翻了几番。最后便是一些细节的优化,比如原子操作,读写缓冲区等等。看似简单的一个排序算法,其中包涵的知识竟是如此丰富。

    哎...今天够累的,签到来了1...
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|小黑屋|Java自学者论坛 ( 声明:本站文章及资料整理自互联网,用于Java自学者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-18 05:47 , Processed in 0.076617 second(s), 29 queries .

    Powered by Discuz! X3.4

    Copyright © 2001-2021, Tencent Cloud.

    快速回复 返回顶部 返回列表