Skip to content

Commit 27a6b91

Browse files
jcf94Trevor Morris
authored andcommitted
[Support] Add parallel_for support to run a loop in parallel (apache#6275)
1 parent 3c1787a commit 27a6b91

3 files changed

Lines changed: 264 additions & 0 deletions

File tree

include/tvm/support/parallel_for.h

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
/*!
21+
* \file parallel_for.h
22+
* \brief An implementation to run loop in parallel.
23+
*/
24+
#ifndef TVM_SUPPORT_PARALLEL_FOR_H_
25+
#define TVM_SUPPORT_PARALLEL_FOR_H_
26+
27+
#include <tvm/runtime/c_runtime_api.h>
28+
29+
#include <functional>
30+
#include <vector>
31+
32+
namespace tvm {
33+
namespace support {
34+
35+
using PartitionerFuncType = std::function<std::vector<std::vector<int>>(int, int, int, int)>;
36+
37+
/*!
38+
* \brief A partitioner to split the task to each thread in Round-robin manner.
39+
* \param begin The start index of this parallel loop(inclusive).
40+
* \param end The end index of this parallel loop(exclusive).
41+
* \param step The traversal step to the index.
42+
* \param num_threads The number of threads(the number of tasks to be partitioned to).
43+
* \return A list with `num_threads` elements, and each is a list of integers indicating the loop
44+
* indexes for the corresponding thread to process.
45+
*/
46+
TVM_DLL std::vector<std::vector<int>> rr_partitioner(int begin, int end, int step, int num_threads);
47+
48+
/*!
49+
* \brief A runtime api provided to run the task function in parallel.
50+
* e.g. A for loop:
51+
* for (int i = 0; i < 10; i++) {
52+
* a[i] = i;
53+
* }
54+
* should work the same as:
55+
* parallel_for(0, 10, [&a](int index) {
56+
* a[i] = i;
57+
* });
58+
* \param begin The start index of this parallel loop(inclusive).
59+
* \param end The end index of this parallel loop(exclusive).
60+
* \param f The task function to be excuted. Assert to take an int index as input with no output.
61+
* \param step The traversal step to the index.
62+
* \param partitioner A partition function to split tasks to different threads. Use Round-robin
63+
* partitioner by default.
64+
* \note 1. Currently do not support nested parallel_for; 2. The order of execution in each thread
65+
* is not guaranteed, the for loop task should be thread independent and thread safe.
66+
*/
67+
TVM_DLL void parallel_for(int begin, int end, const std::function<void(int)>& f, int step = 1,
68+
const PartitionerFuncType partitioner = rr_partitioner);
69+
70+
} // namespace support
71+
} // namespace tvm
72+
73+
#endif // TVM_SUPPORT_PARALLEL_FOR_H_

src/support/parallel_for.cc

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
/*!
21+
* \file parallel_for.cc
22+
* \brief An implementation to run loop in parallel.
23+
*/
24+
#include <dmlc/logging.h>
25+
#include <tvm/support/parallel_for.h>
26+
27+
#include <future>
28+
#include <thread>
29+
#include <utility>
30+
#include <vector>
31+
32+
namespace tvm {
33+
namespace support {
34+
35+
std::vector<std::vector<int>> rr_partitioner(int begin, int end, int step, int num_threads) {
36+
int total_task_count = (end - begin) / step;
37+
CHECK_GT(total_task_count, 0) << "Infinite loop condition, check the input value of "
38+
<< "`begin`, `end`, `step`.";
39+
std::vector<std::vector<int>> ret;
40+
ret.reserve(num_threads);
41+
for (size_t thread = 0; begin < end; begin += step, thread = (thread + 1) % num_threads) {
42+
if (thread >= ret.size()) {
43+
ret.push_back(std::vector<int>());
44+
}
45+
ret[thread].push_back(begin);
46+
}
47+
return ret;
48+
}
49+
50+
void parallel_for(int begin, int end, const std::function<void(int)>& f, int step,
51+
const PartitionerFuncType partitioner) {
52+
int default_num_threads = std::thread::hardware_concurrency();
53+
const auto& run_partitions = partitioner(begin, end, step, default_num_threads);
54+
55+
std::vector<std::thread> threads;
56+
threads.reserve(run_partitions.size());
57+
std::vector<std::future<void>> res_vec;
58+
res_vec.reserve(run_partitions.size());
59+
for (const auto& run_partition : run_partitions) {
60+
std::packaged_task<void(const std::vector<int>&, const std::function<void(int)>&)> task(
61+
[](const std::vector<int>& run_pattition, const std::function<void(int)>& f) {
62+
for (const auto& i : run_pattition) {
63+
f(i);
64+
}
65+
});
66+
res_vec.emplace_back(task.get_future());
67+
threads.emplace_back(std::move(task), run_partition, f);
68+
}
69+
70+
for (auto&& thread : threads) {
71+
thread.join();
72+
}
73+
try {
74+
for (auto&& i : res_vec) {
75+
i.get();
76+
}
77+
} catch (const std::exception& e) {
78+
LOG(FATAL) << "Parallel_for error with " << e.what();
79+
}
80+
}
81+
82+
} // namespace support
83+
} // namespace tvm

tests/cpp/parallel_for_test.cc

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <dmlc/logging.h>
21+
#include <gtest/gtest.h>
22+
#include <tvm/support/parallel_for.h>
23+
24+
#include <vector>
25+
26+
TEST(ParallelFor, Basic) {
27+
using tvm::support::parallel_for;
28+
29+
int a[1000], b[1000];
30+
31+
// Check for a small size of parallel
32+
for (int i = 0; i < 10; i++) {
33+
a[i] = i;
34+
}
35+
parallel_for(0, 10, [&b](int i) { b[i] = i; });
36+
for (int i = 0; i < 10; i++) {
37+
CHECK_EQ(a[i], b[i]);
38+
}
39+
40+
// Check for a large size of parallel
41+
for (int i = 0; i < 1000; i++) {
42+
a[i] = i;
43+
}
44+
parallel_for(0, 1000, [&b](int i) { b[i] = i; });
45+
for (int i = 0; i < 1000; i++) {
46+
CHECK_EQ(a[i], b[i]);
47+
}
48+
49+
// Check for step != 1
50+
for (int i = 0; i < 1000; i += 2) {
51+
a[i] *= 2;
52+
}
53+
parallel_for(
54+
0, 1000, [&b](int i) { b[i] *= 2; }, 2);
55+
for (int i = 0; i < 1000; i++) {
56+
CHECK_EQ(a[i], b[i]);
57+
}
58+
}
59+
60+
TEST(ParallelFor, NestedWithNormalForLoop) {
61+
using tvm::support::parallel_for;
62+
63+
int a[500][500], b[500][500], c[500][500];
64+
65+
for (int i = 0; i < 500; i++) {
66+
for (int j = 0; j < 500; j++) {
67+
a[i][j] = i * j;
68+
}
69+
}
70+
71+
parallel_for(0, 500, [&b](int i) {
72+
for (int j = 0; j < 500; j++) {
73+
b[i][j] = i * j;
74+
}
75+
});
76+
for (int i = 0; i < 500; i++) {
77+
for (int j = 0; j < 500; j++) {
78+
CHECK_EQ(a[i][j], b[i][j]);
79+
}
80+
}
81+
82+
for (int i = 0; i < 500; i++) {
83+
parallel_for(0, 500, [&c, &i](int j) { c[i][j] = i * j; });
84+
}
85+
for (int i = 0; i < 500; i++) {
86+
for (int j = 0; j < 500; j++) {
87+
CHECK_EQ(a[i][j], c[i][j]);
88+
}
89+
}
90+
}
91+
92+
TEST(ParallelFor, Exception) {
93+
using tvm::support::parallel_for;
94+
95+
bool exception = false;
96+
try {
97+
parallel_for(0, 100, [](int i) { LOG(FATAL) << "error"; });
98+
} catch (const std::exception& e) {
99+
exception = true;
100+
}
101+
CHECK(exception);
102+
}
103+
104+
int main(int argc, char** argv) {
105+
testing::InitGoogleTest(&argc, argv);
106+
testing::FLAGS_gtest_death_test_style = "threadsafe";
107+
return RUN_ALL_TESTS();
108+
}

0 commit comments

Comments
 (0)