cxx11_non_blocking_thread_pool.cpp (5039B)
1 // This file is part of Eigen, a lightweight C++ template library 2 // for linear algebra. 3 // 4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com> 5 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com> 6 // 7 // This Source Code Form is subject to the terms of the Mozilla 8 // Public License v. 2.0. If a copy of the MPL was not distributed 9 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 10 11 #define EIGEN_USE_THREADS 12 #include "main.h" 13 #include "Eigen/CXX11/ThreadPool" 14 #include "Eigen/CXX11/Tensor" 15 16 static void test_create_destroy_empty_pool() 17 { 18 // Just create and destroy the pool. This will wind up and tear down worker 19 // threads. Ensure there are no issues in that logic. 20 for (int i = 0; i < 16; ++i) { 21 ThreadPool tp(i); 22 } 23 } 24 25 26 static void test_parallelism(bool allow_spinning) 27 { 28 // Test we never-ever fail to match available tasks with idle threads. 29 const int kThreads = 16; // code below expects that this is a multiple of 4 30 ThreadPool tp(kThreads, allow_spinning); 31 VERIFY_IS_EQUAL(tp.NumThreads(), kThreads); 32 VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1); 33 for (int iter = 0; iter < 100; ++iter) { 34 std::atomic<int> running(0); 35 std::atomic<int> done(0); 36 std::atomic<int> phase(0); 37 // Schedule kThreads tasks and ensure that they all are running. 38 for (int i = 0; i < kThreads; ++i) { 39 tp.Schedule([&]() { 40 const int thread_id = tp.CurrentThreadId(); 41 VERIFY_GE(thread_id, 0); 42 VERIFY_LE(thread_id, kThreads - 1); 43 running++; 44 while (phase < 1) { 45 } 46 done++; 47 }); 48 } 49 while (running != kThreads) { 50 } 51 running = 0; 52 phase = 1; 53 // Now, while the previous tasks exit, schedule another kThreads tasks and 54 // ensure that they are running. 55 for (int i = 0; i < kThreads; ++i) { 56 tp.Schedule([&, i]() { 57 running++; 58 while (phase < 2) { 59 } 60 // When all tasks are running, half of tasks exit, quarter of tasks 61 // continue running and quarter of tasks schedule another 2 tasks each. 62 // Concurrently main thread schedules another quarter of tasks. 63 // This gives us another kThreads tasks and we ensure that they all 64 // are running. 65 if (i < kThreads / 2) { 66 } else if (i < 3 * kThreads / 4) { 67 running++; 68 while (phase < 3) { 69 } 70 done++; 71 } else { 72 for (int j = 0; j < 2; ++j) { 73 tp.Schedule([&]() { 74 running++; 75 while (phase < 3) { 76 } 77 done++; 78 }); 79 } 80 } 81 done++; 82 }); 83 } 84 while (running != kThreads) { 85 } 86 running = 0; 87 phase = 2; 88 for (int i = 0; i < kThreads / 4; ++i) { 89 tp.Schedule([&]() { 90 running++; 91 while (phase < 3) { 92 } 93 done++; 94 }); 95 } 96 while (running != kThreads) { 97 } 98 phase = 3; 99 while (done != 3 * kThreads) { 100 } 101 } 102 } 103 104 105 static void test_cancel() 106 { 107 ThreadPool tp(2); 108 109 // Schedule a large number of closure that each sleeps for one second. This 110 // will keep the thread pool busy for much longer than the default test timeout. 111 for (int i = 0; i < 1000; ++i) { 112 tp.Schedule([]() { 113 std::this_thread::sleep_for(std::chrono::milliseconds(2000)); 114 }); 115 } 116 117 // Cancel the processing of all the closures that are still pending. 118 tp.Cancel(); 119 } 120 121 static void test_pool_partitions() { 122 const int kThreads = 2; 123 ThreadPool tp(kThreads); 124 125 // Assign each thread to its own partition, so that stealing other work only 126 // occurs globally when a thread is idle. 127 std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads); 128 for (int i = 0; i < kThreads; ++i) { 129 steal_partitions[i] = std::make_pair(i, i + 1); 130 } 131 tp.SetStealPartitions(steal_partitions); 132 133 std::atomic<int> running(0); 134 std::atomic<int> done(0); 135 std::atomic<int> phase(0); 136 137 // Schedule kThreads tasks and ensure that they all are running. 138 for (int i = 0; i < kThreads; ++i) { 139 tp.Schedule([&]() { 140 const int thread_id = tp.CurrentThreadId(); 141 VERIFY_GE(thread_id, 0); 142 VERIFY_LE(thread_id, kThreads - 1); 143 ++running; 144 while (phase < 1) { 145 } 146 ++done; 147 }); 148 } 149 while (running != kThreads) { 150 } 151 // Schedule each closure to only run on thread 'i' and verify that it does. 152 for (int i = 0; i < kThreads; ++i) { 153 tp.ScheduleWithHint( 154 [&, i]() { 155 ++running; 156 const int thread_id = tp.CurrentThreadId(); 157 VERIFY_IS_EQUAL(thread_id, i); 158 while (phase < 2) { 159 } 160 ++done; 161 }, 162 i, i + 1); 163 } 164 running = 0; 165 phase = 1; 166 while (running != kThreads) { 167 } 168 running = 0; 169 phase = 2; 170 } 171 172 173 EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool) 174 { 175 CALL_SUBTEST(test_create_destroy_empty_pool()); 176 CALL_SUBTEST(test_parallelism(true)); 177 CALL_SUBTEST(test_parallelism(false)); 178 CALL_SUBTEST(test_cancel()); 179 CALL_SUBTEST(test_pool_partitions()); 180 }