点对点通信是两个进程参与的通信,其中一个进程发送消息,另一个进程接收消息,而聚合通信是在一个进程组内发生的通信,进程组内所有进程都要参与。调用一个聚合通信函数时,通信器中的所有进程必须同时调用同一函数,共同参与操作。通信函数在各个进程中的调用方式完全相同,而不是像点对点通信那样分为发送函数和接收函数。
聚合通信函数主要为分为三个功能:
- 通信:完成组内进程间的数据传输。这部分的功能使用点对点通信也能完成,但聚合通信性能更好,表达更简洁。
- 同步:实现组内所有进程在特定点的执行速度保持一致
- 计算:对给定的数据在进程间完成一定的操作
广播指一个进程(称为根进程)同时发送同样的消息给通信器中的所有其它进程,MPI中的广播函数是MPI_Bcast。MPI_Bcast的函数原型为:
1
2
3
4
5
6
7
|
int MPI_Bcast(
void *buffer, // 通信消息缓冲区的起始位置
int count, // 广播 / 接收数据的个数
MPI_Datatype datatype, // 广播 / 接收数据的数据类型
int root, // 广播数据的根进程号
MPI_Comm comm // 通信器
);
|
其效果是将根进程中buffer数组中的数据拷贝到其它进程的buffer中去。对于广播调用,不论是发送消息的根进程,还是接收消息的其它进程,在调用形式上完全一致。
MPI_Bcast一般用于将某个进程的计算结果告知其它所有进程,然后其它进程利用结果进行后续计算。例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#include <mpi.h>
#include <fstream>
#include <vector>
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int id = 0;
int np = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
std::vector<int> dim(2, 0);
if (id == 0) {
// 打开一个数据文件,从里面读出矩阵的数据
std::ifstream file{"mat.txt"};
// 行数和列数
file >> dim[0] >> dim[1];
// 其它数据...
}
MPI_Bcast(dim.data(), 2, MPI_INT, 0, MPI_COMM_WORLD);
int M = dim[0];
int N = dim[1];
// ...
}
|
其中mat.txt的内容大致为:
10 10
1 2 3 4 5 6 7 8 9 10
...
广播后每个进程中M和N的值都是10。
注意:mat.txt使用的编码不要带有头部信息,如UTF-8 with BOM、UTF-16、UTF-32,否则可能出错。
聚合通信函数一般可以由点对点通信函数实现,MPI_Bcast可以这样实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
void bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) {
int id = 0;
int np = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
if (id == root) {
// root进程向i号进程发送数据
for (int i = 0; i < np; ++i) {
if (i != root)
MPI_Send(buffer, count, datatype, i, 0, comm);
}
} else {
// 其它进程接收root进程发送过来的数据
MPI_Recv(buffer, count, datatype, root, 0, comm, MPI_STATUS_IGNORE);
}
}
|
这个实现中没有实现返回错误码,性能也不如MPI_Bcast,因为无法根据网络拓扑进行优化。
数据收集指一个进程(根进程),从指定通信器中的所有进程(包括根进程),收集数据。收集到的数据将按照进程的id顺序放入指定的接收缓冲区内。MPI的数据收集函数有:MPI_Gather, MPI_Gatherv, MPI_Allgather, 和MPI_Allgatherv。它们分别用于等长的数据块收集,不等长的数据块收集,等长的数据块收集加广播,和不等长的数据块收集加广播。下面是Gather的示意图。
MPI_Gather的函数声明为:
1
2
3
4
5
6
7
8
9
10
|
int MPI_Gather(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 发送数据的个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int recvcount, // 接收数据的个数
MPI_Datatype recvtype, // 接收数据的类型
int root, // 根进程的编号
MPI_Comm comm // 通信器
);
|
调用MPI_Gather,根进程将从通信器comm内的每个进程收集sendcount个类型为sendtype的数据,将其转换成recvcount个类型为recvtype的数据,按照进程编号顺序放入recvbuf中。参数recvbuf, recvcount, 和recvtype仅对root进程有意义。
需要注意的是,recvcount指的是从每个进程中接收的数据个数,而不是从所有进程接收的数据长度之和。当sendtype和recvtype相同的时候,sendcount和recvcount应当相等。
例1:各进程生成n个数,然后收集到0号进程上来。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
#include <iostream>
#include <vector>
#include <mpi.h>
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
const int n = 10;
std::vector<int> send_array(n); // 发送缓冲区
std::vector<int> recv_array; // 接收缓冲区,只有根进程用得上
int np = 0;
int id = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
// 0号进程生成0,1,2,...,n-1,1号进程生成n,n+1,...
for (int i = 0; i < n; ++i)
send_array[i] = id * n + i;
if (id == 0)
recv_array.resize(n * np);
MPI_Gather(
send_array.data(), // 发送缓冲区
n, MPI_INT, // 每个进程发送n个int
recv_array.data(), // 接收缓冲区
n, MPI_INT, // 根进程从每个进程接收n个int
0, MPI_COMM_WORLD // 0号进程为根进程
);
if (id == 0) {
for(int i = 0; i < n * np; ++i)
std::cout << recv_array[i] << " ";
std::cout << std::endl;
}
MPI_Finalize();
return 0;
}
|
4个进程的执行结果:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
MPI_Gather函数的最大限制在于从每个进程上收集的数据个数必须相同,而划分任务时,任务的规模不总是进程数量的倍数,此时应当使用MPI_Gatherv函数。MPI_Gatherv的声明为:
1
2
3
4
5
6
7
8
9
10
11
|
int MPI_Gatherv(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 发送数据的个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int *recvcounts, // 从每个进程接收的数据个数
int *displs, // 接收数据在消息缓冲区中的索引
MPI_Datatype recvtype, // 接收数据的类型
int root, // 根进程的编号
MPI_Comm comm // 通信器
);
|
MPI_Gatherv允许每个进程发送数量不同的数据,并且根进程可以通过displs数组来安排接收到的数据安放于何处。各个进程发送多少个数据通过参数sendcount指定,这个值在不同进程中可以不同。recvbuf, recvcounts, displs, recvtype仅对根进程有意义。数组recvcounts和displs的元素个数等于进程数,用于指定从每个进程接收的数据个数和它们在recvbuf中的起始位移,如recvcounts[i]表示从i号进程收集的数据个数。数组元素类型必须统一,因此recvtype是标量类型。
例2:生成n个随机数。
每个进程应当生成多少个呢?如果直接用n除以np向上取整,则可能造成负载不均衡,如4个进程生成17个随机数,每个进程分别生成5、5、5、2个。负载均衡的做法应当为5、4、4、4个,即使用n除以np向下取整,设余数为r,则前r个进程每个进程多算一个。生成一个随机数的代价不是很高,前一种做法最后一个进程最多比其它进程少算np-1个,实际差距比起数据规模来讲也许并不大,具体问题具体分析即可。这里采用后一种做法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
#include <iostream>
#include <vector>
#include <random>
#include <mpi.h>
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int np = 0;
int id = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
const int N = 17;
// 本进程需要生成多少个数据
const int count = N / np + (id < N % np ? 1 : 0);
std::vector<int> send_array(count); // 发送缓冲区
std::vector<int> recv_array; // 接收缓冲区,只有根进程用得上
// 生成随机数
std::random_device rd;
std::default_random_engine re{rd()};
std::uniform_int_distribution<int> dist{0, 100};
for (int i = 0; i < count; ++i)
send_array[i] = dist(re);
if (id == 0)
recv_array.resize(N);
// 将随机数收集到0号进程上来。以4个进程17个数为例,从各进程接收的数据量为:
// 5 4 4 4,位移量为:0 5 9 13
std::vector<int> recvcounts;
std::vector<int> displs;
if (id == 0) {
recvcounts.resize(np);
displs.resize(np);
recvcounts[0] = N / np + (N % np != 0 ? 1 : 0);
displs[0] = 0;
for (int i = 1; i < np; ++i) {
recvcounts[i] = N / np + (i < N % np ? 1 : 0);
displs[i] = displs[i - 1] + recvcounts[i - 1];
}
}
MPI_Gatherv(
send_array.data(), // 发送缓冲区
count, MPI_INT, // 每个进程发送count个int
recv_array.data(), // 接收缓冲区
recvcounts.data(), // 根进程从每个进程接收多少个数据
displs.data(), // 从每个进程接收的数据放于recv_array中的起始位置
MPI_INT, // 根进程从每个进程接收N个int
0, MPI_COMM_WORLD // 0号进程为根进程
);
// 0号进程打印
if (id == 0) {
for(int i = 0; i < N; ++i)
std::cout << recv_array[i] << " ";
std::cout << std::endl;
}
MPI_Finalize();
return 0;
}
|
从每个进程接收的数据个数也可以直接收集到0号进程上来:
1
|
MPI_Gather(&count, 1, MPI_INT, recvcounts.data(), 1, MPI_INT, 0, MPI_COMM_WORLD);
|
除了MPI_Gather和MPI_Gatherv以外,MPI还提供两个收集函数MPI_Allgather和MPI_Allgatherv,它们在MPI_Gather和MPI_Gatherv的基础上,不仅会收集数据,还会将数据广播到所有进程,。MPI_Allgather的声明为:
1
2
3
4
5
6
7
8
9
|
int MPI_Allgather(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 向每个进程发送的数据个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int recvcount, // 接收数据的个数
MPI_Datatype recvtype, // 接收数据的类型
MPI_Comm comm // 通信器
);
|
MPI_Allgather和MPI_Gather比起来只少了一个root参数,其它参数含义与MPI_Gather相同。MPI_Allgather相当于以任意一个进程为根进程调用一次MPI_Gather,然后再广播:
1
2
3
|
// 以0号进程为根进程
MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, 0, comm);
MPI_Bcast(recvbuf, np * recvcount, recvtype, 0, comm);
|
或者以任意进程为根进程各调用一次MPI_Gather:
1
2
|
for (int root = 0; root < np; ++root)
MPI_Gather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm);
|
与MPI_Allgather类似,MPI_Allgatherv用于收集不同长度的数据块并广播,其声明为:
1
2
3
4
5
6
7
8
9
10
|
int MPI_Allgatherv(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 发送数据的个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int *recvcounts, // 从每个进程接收的数据个数
int *displs, // 接收数据在消息缓冲区中的索引
MPI_Datatype recvtype, // 接收数据的类型
MPI_Comm comm // 通信器
);
|
需要注意MPI_Allgather和MPI_Allgatherv中的recvbuf、recvcounts、displs和recvtype需要在每个进程中都有正确的定义。
散发操作是数据收集操作的逆向操作,它将一个进程中的数据按块散发给通信器中的所有进程。与MPI_Gather和MPI_Gatherv类似,MPI提供两个散发函数MPI_Scatter和MPI_Scatterv,分别用于将数据等长分割和不等长分割。下面是Scatter的示意图。
MPI_Scatter的声明为:
1
2
3
4
5
6
7
8
9
10
|
int MPI_scatter(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 发送数据的个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int recvcount, // 接收数据的个数
MPI_Datatype recvtype, // 接收数据的类型
int root, // 根进程的编号
MPI_Comm comm // 通信器
);
|
MPI_Scatter用于散发相同长度数据块。根进程将自己的sendbuf中的np个连续存放的数据块按进程号的顺序依次分发到comm的各个进程 (包括根进程自己) 的recvbuf中,这里np代表comm中的进程数。sendcnt和sendtype给出sendbuf中每个数据块的大小和类型,recvcnt和recvtype给出recvbuf的大小和类型,其中参数sendbuf、sendcnt和sendtype仅对根进程有意义。
需要特别注意的是,在根进程中,参数sendcnt指分别发送给每个进程的数据长度,而不是发送给所有进程的数据长度之和。因此,当recvtype等于sendtype时,recvcnt应该等于sendcnt。
例2中生成n个随机数,其中n是编译时常量,所有进程运行时都知道,如果n在运行时输入,可能只有0号进程知道,如《广播》一节从文件中读取矩阵规模一样,0号进程要么通过广播告知其它进程,然后按例2计算,要么将每个进程需要处理的数据个数计算好,然后把结果告知其它进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
std::vector<int> recvcounts;
std::vector<int> displs;
if (id == 0) {
recvcounts.resize(np);
displs.resize(np);
recvcounts[0] = N / np + (N % np != 0 ? 1 : 0);
displs[0] = 0;
for (int i = 1; i < np; ++i) {
recvcounts[i] = N / np + (i < N % np ? 1 : 0);
displs[i] = displs[i - 1] + recvcounts[i - 1];
}
}
// 本进程需要生成多少个数据
int count = 0;
MPI_Scatter(recvcounts.data(), 1, MPI_INT, &count, 1, MPI_INT, 0, MPI_COMM_WORLD);
std::vector<int> send_array(count); // 发送缓冲区
std::vector<int> recv_array; // 接收缓冲区,只有根进程用得上
|
相比于一个进程计算好,再发散到其它进程上,一般更倾向于所有进程各算一个,再收集到某个进程上。
和MPI_Gatherv类似,MPI_Scatterv用于散发长度不同的数据块。MPI_Scatterv的声明为:
1
2
3
4
5
6
7
8
9
10
11
|
int MPI_Scatterv(
void *sendbuf, // 发送缓冲区的起始地址
int *sendcounts, // 向每个进程发送的数据个数
int *displs, // 发送数据的偏移
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int recvcount, // 接收数据的个数
MPI_Datatype recvtype, // 接收数据的类型
int root, // 根进程的编号
MPI_Comm comm // 通信器
);
|
MPI_Scatterv允许sendbuf中每个数据块的长度不同并且可以按任意顺序存放。sendbuf、sendcounts、displs和sendtype只对根进程有意义。其使用方法和MPI_Gatherv类似,只是功能完全相反,此处不再赘述。
全互换操作是组内进程完全交换,每个进程都向其它所有进程发送消息,同时每个进程都从其它所有进程接收消息。全互换操作结束后,每个进程得到的结果是不一样的,它相当于将按行存在的数据转换成按列存放的数据。下面是全互换的示意图:
图中,$A_{ij}$表示第i个进程中发送缓冲区的第j个数据块。
全互换对应的函数为MPI_Alltoall,其声明为:
1
2
3
4
5
6
7
8
9
|
int MPI_Alltoall(
void *sendbuf, // 发送缓冲区的起始地址
int sendcount, // 每个进程发送的数据个数
MPI_Datatype sendtype, // 发送数据类型
void *recvbuf, // 接收缓冲区的起始地址
int recvcount, // 接收数据的个数
MPI_Datatype recvtype, // 接收数据的类型
MPI_Comm comm // 通信器
);
|
MPI_Alltoall要求每段数据的长度相同,为sendcount,共np块。第i个进程把sendbuf中的第j块数据发送到第j个进程的recvbuf的第i个位置。需要注意MPI_Alltoall是以数据块为单位进行置换,不能直接用于转置矩阵。
矩阵在存储上分为行主序存储方式和列主序存储方式,即连续存放的数据是属于同一行还是同一列,两种存储方式是转置关系。如果封装mat(i, j)来访问矩阵,它们的操作基本没有区别,在共享存储式并行框架下很容易写出矩阵与向量的并行乘法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
class Matrix {
public:
// 行主序
double &operator()(int i, int j) { return data[i * num_cols + j]; }
// 列主序
double &operator()(int i, int j) { return data[j * num_rows + i]; }
private:
std::vector<double> data;
int num_rows;
int num_cols;
};
Matrix mat; // 矩阵
std::vector<double> vec(mat.num_cols); // 向量
std::vector<double> result(mat.num_rows); // mat * vec
#pragma omp parallel for
for (int i = 0; i < mat.num_rows; ++i) {
result[i] = 0;
for (int j = 0; j < mat.num_cols; ++j)
result[i] += mat(i, j) * vec[j];
}
|
但是在非共享存储式并行计算中,矩阵分布式地存储于各个进程,行主序和列主序就有了区别。此处演示行主序存储方式下,矩阵与向量并行乘法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <random>
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int id = 0;
int np = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
std::vector<int> dim(2, 0);
std::vector<double> mat;
if (id == 0) {
// 打开一个数据文件,从里面读出矩阵的数据
std::ifstream mat_file{"mat.txt"};
// 行数和列数
mat_file >> dim[0] >> dim[1];
// 矩阵数据
mat.resize(dim[0] * dim[1]);
for (int i = 0; i < mat.size(); ++i)
mat_file >> mat[i];
}
MPI_Bcast(dim.data(), 2, MPI_INT, 0, MPI_COMM_WORLD);
const int M = dim[0]; // 行数
const int N = dim[1]; // 列数
const int rows = M / np + (id < M % np ? 1 : 0); // 本进程多少行
// 散发数据到各个进程
std::vector<int> counts;
std::vector<int> displs;
if (id == 0) {
counts.resize(np);
displs.resize(np);
counts[0] = (M / np + (M % np != 0 ? 1 : 0)) * N;
displs[0] = 0;
for (int i = 1; i < np; ++i) {
counts[i] = (M / np + (i < M % np ? 1 : 0)) * N;
displs[i] = displs[i - 1] + counts[i - 1];
}
}
std::vector<double> submat(rows * N);
MPI_Scatterv(
mat.data(), // 发送缓冲区
counts.data(), // 向每个进程发多少个数据
displs.data(), // 每个数据块的起始位移
MPI_DOUBLE,
submat.data(), // 接收缓冲区
rows * N, // 接收多少个数据,别忘了乘N
MPI_DOUBLE,
0, // 根进程
MPI_COMM_WORLD
);
// 从文件中读取向量数据
std::vector<double> vec(N);
if (0 == id) {
std::ifstream vec_file{"vec.txt"};
int size = 0;
vec_file >> size;
if (size != N) {
std::cerr << "The number of columns " << N << " must"
"be equal to the size of vector " << size << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
}
for (int i = 0; i < size; ++i)
vec_file >> vec[i];
}
// 将向量广播到其它进程
MPI_Bcast(vec.data(), N, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// 计算子矩阵与向量的积
std::vector<double> subvec(rows, 0);
for (int i = 0; i < rows; ++i) {
for (int j = 0; j < N; ++j)
subvec[i] += submat[i * N + j] * vec[j];
}
// 将结果收集到0号进程上,并输出。
// 如果将结果用于后续的矩阵乘向量,则应该使用MPI_Allgatherv。
// 如果将结果用于向量加减、数乘或者内积则不需要通信。
std::vector<double> result;
if (id == 0) {
result.resize(M);
counts[0] = M / np + (M % np != 0 ? 1 : 0);
displs[0] = 0;
for (int i = 1; i < np; ++i) {
counts[i] = M / np + (i < M % np ? 1 : 0);
displs[i] = displs[i - 1] + counts[i - 1];
}
}
MPI_Gatherv(
subvec.data(), // 发送缓冲区
rows, // 本进程发送多少个数据
MPI_DOUBLE,
result.data(), // 接收缓冲区
counts.data(), // 从各个进程接收多少个数据
displs.data(), // 从各个进程接收的数据放在何处
MPI_DOUBLE,
0, // 收集到0号进程上
MPI_COMM_WORLD
);
if (0 == id) {
std::cout << "calculate by parallel: " << std::endl;
for (int i = 0; i + 1 < M; ++i)
std::cout << result[i] << ", ";
std::cout << result.back() << std::endl;
}
// 串行计算对比
if (0 == id) {
std::cout << "calculate by sequence: " << std::endl;
for (int i = 0; i < M; ++i) {
result[i] = 0;
for (int j = 0; j < N; ++j)
result[i] += mat[i * N + j] * vec[j];
}
for (int i = 0; i + 1 < M; ++i)
std::cout << result[i] << ", ";
std::cout << result.back() << std::endl;
}
MPI_Finalize();
return 0;
}
|
其中mat.txt的内容为:
12 12
0 1 2 3 4 5 6 7 8 9 10 11
12 13 14 15 16 17 18 19 20 21 22 23
24 25 26 27 28 29 30 31 32 33 34 35
36 37 38 39 40 41 42 43 44 45 46 47
48 49 50 51 52 53 54 55 56 57 58 59
60 61 62 63 64 65 66 67 68 69 70 71
72 73 74 75 76 77 78 79 80 81 82 83
84 85 86 87 88 89 90 91 92 93 94 95
96 97 98 99 100 101 102 103 104 105 106 107
108 109 110 111 112 113 114 115 116 117 118 119
120 121 122 123 124 125 126 127 128 129 130 131
132 133 134 135 136 137 138 139 140 141 142 143
vec.txt的内容为:
12
1 2 3 4 5 6 7 8 9 10 11 12
程序计算结果为:
calculate by parallel:
572, 1508, 2444, 3380, 4316, 5252, 6188, 7124, 8060, 8996, 9932, 10868
calculate by sequence:
572, 1508, 2444, 3380, 4316, 5252, 6188, 7124, 8060, 8996, 9932, 10868
并行和串行一致。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
#include <mpi.h>
#include <iostream>
#include <functional>
#include <cmath>
#include <cstdlib>
struct Interval {
double a, b, fa, fb, fc;
Interval *last, *next;
};
double Integrate(const std::function<double(double)> &f, double a, double b,
double eps = 1e-15) {
// 散发求值任务,使多次求函数值的过程并行进行。F的功能可以看作请求求值,结果放入v中,当
// v为nullptr时,功能为等待所有已提交的求值请求完成。
int np = 0;
MPI_Comm_size(MPI_COMM_WORLD, &np);
int *counts = new int[np];
int *displs = new int[np];
double *x_save = nullptr; // 保存需要求函数值的x
double **f_save = nullptr; // 保存对应的函数值返回地址
int size = 0; // 已分配的单元个数,即有x_save有多少个可用单元
std::function<void(double, double *)> F = [&np, &counts, &displs, &x_save,
&f_save, &size](double x, double *v) -> void {
static int n = 0; // 目前有多少个未计算函数值的x
if (v == nullptr) {
counts[0] = 0; // 0号进程不发送数据
displs[0] = 0;
for (int i = 1; i < np; ++i) {
counts[i] = n / (np - 1) + (i <= n % (np - 1) ? 1 : 0);
displs[i] = displs[i - 1] + counts[i - 1];
}
int i = 0; // 各进程计算任务个数
// 告知各进程有多少个点需要计算函数值
MPI_Scatter(counts, 1, MPI_INT, &i, 1, MPI_INT, 0, MPI_COMM_WORLD);
// 散发需要计算函数值的点
MPI_Scatterv(x_save, counts, displs, MPI_DOUBLE,
nullptr, 0, MPI_DOUBLE, 0, MPI_COMM_WORLD);
// 收集计算好的函数值
MPI_Gatherv(nullptr, 0, MPI_DOUBLE,
x_save, counts, displs, MPI_DOUBLE, 0, MPI_COMM_WORLD);
for (int j = 0; j < n; ++j)
*(f_save[j]) = x_save[j];
n = 0;
return;
}
// 需要增加内存
if (n >= size) {
size += 128; // 增加128个单元的内存
x_save = (double *)realloc(x_save, size * sizeof(double));
f_save = (double **)realloc(f_save, size * sizeof(double *));
}
x_save[n] = x;
f_save[n] = v;
++n;
};
eps *= 3 * (b - a); // 精度计算中的公因子预先算上
double result = 0;
Interval *root = new Interval{a, b, f(a), f(b), 0, nullptr, nullptr};
while (root != nullptr) {
// 先计算各区间中点函数值
for (Interval *i = root; i != nullptr; i = i->next)
F(0.5 * (i->a + i->b), &i->fc);
// 等待所有函数值计算完成
F(0, nullptr);
// 计算各区间积分值,如果精度不够就继续细分
Interval *i = root;
while (i != nullptr) {
double h = i->b - i->a;
double xc = 0.5 * (i->a + i->b);
double v0 = 0.5 * h * (i->fa + i->fb);
double v1 = 0.5 * (v0 + i->fc * h);
// 误差满足要求或者区间长度太小则计算此区间内的积分并将此区间移出C
if (fabs(v1 - v0) < h * eps || xc == i->a || xc == i->b) {
result += v1;
if (i->next != nullptr)
i->next->last = i->last;
if (i->last != nullptr)
i->last->next = i->next;
Interval *temp = i;
i = i->next;
if (temp == root)
root = i;
delete temp;
continue;
}
// 将i拆分成两个区间,i保留左半区间,i后面插入右半区间
Interval *j = new Interval;
j->a = xc;
j->b = i->b;
j->fa = i->fc;
j->fb = i->fb;
i->b = xc;
i->fb = i->fc;
j->next = i->next;
if (i->next != nullptr)
i->next->last = j;
i->next = j;
j->last = i;
i = j->next; // 跳过刚插入的j
}
}
delete[] counts;
delete[] displs;
delete[] x_save;
delete[] f_save;
return result;
}
int main(int argc, char *argv[]) {
auto f = [](double x) { return 4 / (1 + x * x); }; // 被积函数
double a = 0, b = 1; // 积分区间
double eps = 1e-15; // 积分精度
int np = 0, id = 0;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &np);
MPI_Comm_rank(MPI_COMM_WORLD, &id);
// 主从模式至少需要两个进程
if (np < 2) {
std::cerr << "This program needs at least two processes!" << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
}
// n为各进程收到的计算请求个数
int n = 0;
// 0号进程作为主进程
if (0 == id) {
double pi = Integrate(f, a, b, eps);
std::cout.precision(16);
std::cout << "pi is " << pi << ".\n";
// 通知其它进程退出
int *token = new int[np];
for (int i = 0; i < np; ++i)
token[i] = -1;
MPI_Scatter(token, 1, MPI_INT, &n, 1, MPI_INT, 0, MPI_COMM_WORLD);
delete[] token;
} else {
double *x_save = nullptr;
int size = 0;
while (true) {
// 先接收有多少个主进程给本进程分配了多少个计算任务
MPI_Scatter(nullptr, 0, MPI_INT, &n, 1, MPI_INT, 0, MPI_COMM_WORLD);
// n为-1时是退出信号,大于0时是F发来的计算任务个数
if (n < 0) {
delete[] x_save;
break;
}
// 空间不够,重新分配
if (n > size) {
delete[] x_save;
x_save = new double[n];
size = n;
}
// 接收需要计算函数值的点
MPI_Scatterv(nullptr, 0, 0, MPI_DOUBLE, x_save, n, MPI_DOUBLE, 0,
MPI_COMM_WORLD);
for (int i = 0; i < n; ++i)
x_save[i] = f(x_save[i]);
// 将计算好的函数值收集到0号进程
MPI_Gatherv(x_save, n, MPI_DOUBLE, nullptr, 0, 0, MPI_DOUBLE, 0,
MPI_COMM_WORLD);
}
}
MPI_Finalize();
return 0;
}
|
这段代码使用new和delete管理内存,有兴趣的读者可以改为使用std::vector实现。
进程同步函数MPI_Barrier用于一个通信器中所有进程的同步。调用该函数时,进程将处于等待状态,直到通信器中所有进程都调用了该函数后才继续执行。其声明为:
1
2
3
|
int MPI_Barrier(
MPI_Comm comm
);
|
在前面的例子中,输出一个向量我们总是先把它收集到0号进程上再输出,过程十分繁琐,使用MPI_Barrier可以做到有序输出:
1
2
3
4
5
6
7
|
for (int i = 0; i < np; ++i) {
MPI_Barrier(MPI_COMM_WORLD);
if (i == id) {
for (int j = 0; j < rows; ++j)
std::cout << result[i] << " ";
}
}
|
计算操作有归约和扫描两种操作,归约是对分布于各个进程上的数据进行求和,而扫描是对分布于各个进程上的数据求前缀和。归约操作的函数有MPI_Reduce、MPI_Reduce_scatter和MPI_Allreduce,而扫描操作的函数为MPI_Scan。
MPI_Reduce的声明为:
1
2
3
4
5
6
7
8
9
|
int MPI_Reduce(
void *sendbuf, // 发送缓冲区的起始地址
void *recvbuf, // 接收缓冲区的起始地址
int count, // 数据个数
MPI_Datatype datatype, // 数据类型
MPI_Op op, // 规约操作符
int root, // 根进程号
MPI_Comm comm // 通信器
);
|
MPI_Reduce的示意图如下:
MPI_Allreduce的声明为:
1
2
3
4
5
6
7
8
|
int MPI_Allreduce(
void *sendbuf, // 发送缓冲区的起始地址
void *recvbuf, // 接收缓冲区的起始地址
int count, // 数据个数
MPI_Datatype datatype, // 数据类型
MPI_Op op, // 规约操作符
MPI_Comm comm // 通信器
);
|
全归约的操作与普通归约MPI_Reduce类似,但所有进程都会获得归约运算的结果,因此它和MPI_Reduce相比,少了一个root参数,其余参数的含义与MPI_Reduce完全相同。其示意图为
MPI_Reduce_scatter的声明为:
1
2
3
4
5
6
7
8
|
int MPI_Reduce_scatter(
void *sendbuf, // 发送缓冲区的起始地址
void *recvbuf, // 接收缓冲区的起始地址
int *recvcounts, // 数据个数
MPI_Datatype datatype, // 数据类型
MPI_Op op, // 规约操作符
MPI_Comm comm // 通信器
);
|
MPI_Reduce_scatter将归约的结果散发到各个进程上,散发时第i个进程接收recvcounts[i]个数据。参数中没有指明sendbuf的长度,因为参与归约的数组和归约的结果数组长度是相同的,因此最后散发的数据总量就是sendbuf的长度。
MPI_Reduce_scatter的示意图为:
扫描操作就是求前缀和,将前i个进程的归约结果放在第i个进程的接收缓冲区内。其声明为:
1
2
3
4
5
6
7
8
|
int MPI_Scan(
void *sendbuf, // 发送缓冲区的起始地址
void *recvbuf, // 接收缓冲区的起始地址
int count, // 输入缓冲区中元素的个数
MPI_Datatype datatype, // 数据类型
MPI_Op op, // 规约操作符
MPI_Comm comm // 通信器
);
|
其示意图为:
MPI_Op |
描述 |
MPI_MAX |
两个操作数中较大的一个 |
MPI_MIN |
两个操作数中较小的一个 |
MPI_SUM |
两个操作数之和 |
MPI_PROD |
两个操作数之积 |
MPI_LAND |
两个操作数的逻辑与(logical and) |
MPI_BAND |
两个操作数的按位与(bitwise and) |
MPI_LOR |
两个操作数的逻辑或(logical or) |
MPI_BOR |
两个操作数的按位或(bitwise or) |
MPI_LXOR |
两个操作数的逻辑异或(logical xor) |
MPI_BXOR |
两个操作数的按位异或(bitwise xor) |
MPI_MAXLOC |
两对操作数的最大值和最大值的位置(u,i)op(v,j)=(max(u,v),k) |
MPI_MINLOC |
两对操作数的最小值和最小值的位置(u,i)op(v,j)=(min(u,v),k) |
其中最后两个运算的使用方法稍微复杂一些,示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
const int LEN = 1000;
float val[LEN]; // 用于保存数据,最大长度为LEN
int count; // 每个进程有count个数据
int id = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
int minrank = 0; // 最小值在哪个进程上
int minindex = 0; // 最小值在哪个下标上
float minval; // 最小值
struct {
float value;
int index;
} in, out;
// 每个进程进行局部归约
in.value = val[0];
in.index = 0;
for (int i = 1; i < count; i++) {
if (in.value > val[i]) {
in.value = val[i];
in.index = i;
}
}
in.index = id * LEN + in.index;
MPI_Reduce(&in, &out, 1, MPI_FLOAT_INT, MPI_MINLOC, 0, MPI_COMM_WORLD);
minval = out.value;
minrank = out.index / LEN; // 最小值在哪个进程上
minindex = out.index % LEN; // 最小值在val的哪个位置
|
其中自定义的结构体的类型在MPI中为:
名称 |
描述 |
MPI_FLOAT_INT |
浮点型和整形 |
MPI_DOUBLE_INT |
双精度和整形 |
MPI_LONG_INT |
长整形和整形 |
MPI_2INT |
整型值对 |
MPI_SHORT_INT |
短整形和整形 |
MPI_LONG_DOUBLE_INT |
长双精度浮点型和整型 |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#include <mpi.h>
#include <iostream>
#include <fstream>
#include <vector>
#include <random>
int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int id = 0;
int np = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &id);
MPI_Comm_size(MPI_COMM_WORLD, &np);
std::vector<int> dim(2, 0);
std::vector<double> mat;
if (id == 0) {
// 打开一个数据文件,从里面读出矩阵的数据
std::ifstream mat_file{"mat.txt"};
// 列数和行数
mat_file >> dim[1] >> dim[0];
// 矩阵数据
mat.resize(dim[0] * dim[1]);
for (int i = 0; i < mat.size(); ++i)
mat_file >> mat[i];
}
MPI_Bcast(dim.data(), 2, MPI_INT, 0, MPI_COMM_WORLD);
const int M = dim[0]; // 行数
const int N = dim[1]; // 列数
const int cols = N / np + (id < N % np ? 1 : 0); // 本进程多少列
// 散发数据到各个进程
std::vector<int> counts;
std::vector<int> displs;
if (id == 0) {
counts.resize(np);
displs.resize(np);
counts[0] = (N / np + (N % np != 0 ? 1 : 0)) * M;
displs[0] = 0;
for (int i = 1; i < np; ++i) {
counts[i] = (N / np + (i < N % np ? 1 : 0)) * M;
displs[i] = displs[i - 1] + counts[i - 1];
}
}
std::vector<double> submat(cols * M);
MPI_Scatterv(
mat.data(), // 发送缓冲区
counts.data(), // 向每个进程发多少个数据
displs.data(), // 每个数据块的起始位移
MPI_DOUBLE,
submat.data(), // 接收缓冲区
cols * M, // 接收多少个数据,别忘了乘M
MPI_DOUBLE,
0, // 根进程
MPI_COMM_WORLD
);
// 从文件中读取向量数据
std::vector<double> vec;
if (0 == id) {
std::ifstream vec_file{"vec.txt"};
int size = 0;
vec_file >> size;
if (size != N) {
std::cerr << "The number of columns " << N << " must"
"be equal to the size of vector " << size << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
}
vec.resize(N);
for (int i = 0; i < N; ++i)
vec_file >> vec[i];
}
// 将向量散发到其它进程
std::vector<double> subvec(cols);
if (id == 0) {
counts[0] = N / np + (N % np != 0 ? 1 : 0);
displs[0] = 0;
for (int i = 1; i < np; ++i) {
counts[i] = N / np + (i < N % np ? 1 : 0);
displs[i] = displs[i - 1] + counts[i - 1];
}
}
MPI_Scatterv(
vec.data(), // 发送缓冲区
counts.data(), // 向每个进程发多少个数据
displs.data(), // 每个数据块的起始位移
MPI_DOUBLE,
subvec.data(), // 接收缓冲区
cols, // 接收多少个数据
MPI_DOUBLE,
0, // 根进程
MPI_COMM_WORLD
);
// 计算子矩阵与向量的积
std::vector<double> partial_vec(M, 0);
// for (int i = 0; i < M; ++i) {
// for (int j = 0; j < cols; ++j)
// partial_vec[i] += submat[j * M + i] * vec[j];
// }
for (int j = 0; j < cols; ++j) {
for (int i = 0; i < M; ++i)
partial_vec[i] += submat[j * M + i] * subvec[j];
}
// 将结果归约到0号进程上,并输出
std::vector<double> result;
if (0 == id)
result.resize(M);
MPI_Reduce(
partial_vec.data(),
result.data(),
M,
MPI_DOUBLE,
MPI_SUM,
0,
MPI_COMM_WORLD
);
if (0 == id) {
std::cout << "calculate by parallel: " << std::endl;
for (int i = 0; i + 1 < M; ++i)
std::cout << result[i] << ", ";
std::cout << result.back() << std::endl;
}
// 串行计算对比
if (0 == id) {
std::cout << "calculate by sequence: " << std::endl;
for (int i = 0; i < M; ++i)
result[i] = 0;
for (int j = 0; j < N; ++j) {
for (int i = 0; i < M; ++i)
result[i] += mat[i + j * M] * vec[j];
}
for (int i = 0; i + 1 < M; ++i)
std::cout << result[i] << ", ";
std::cout << result.back() << std::endl;
}
MPI_Finalize();
return 0;
}
|
运行结果:
calculate by parallel:
6864, 6942, 7020, 7098, 7176, 7254, 7332, 7410, 7488, 7566, 7644, 7722
calculate by sequence:
6864, 6942, 7020, 7098, 7176, 7254, 7332, 7410, 7488, 7566, 7644, 7722
文件mat.txt和vec.txt的内容和之前相同,但这次将mat.txt看作列主序存储,即首行两个数分别表示列数和行数,接下来每一行数据都表示矩阵一列。从计算结果来看,并行计算和串行计算结果相同。
[1] https://www.open-mpi.org/doc/current/.
[2] https://www.mpich.org/static/docs/latest/.
[3] https://www.mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf.