cart-elc

Source code for CART-ELC
git clone git://git.laack.co/cart-elc.git
Log | Files | Refs | README | LICENSE

cxx11_non_blocking_thread_pool.cpp (5039B)


      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
      5 // Copyright (C) 2016 Benoit Steiner <benoit.steiner.goog@gmail.com>
      6 //
      7 // This Source Code Form is subject to the terms of the Mozilla
      8 // Public License v. 2.0. If a copy of the MPL was not distributed
      9 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
     10 
     11 #define EIGEN_USE_THREADS
     12 #include "main.h"
     13 #include "Eigen/CXX11/ThreadPool"
     14 #include "Eigen/CXX11/Tensor"
     15 
     16 static void test_create_destroy_empty_pool()
     17 {
     18   // Just create and destroy the pool. This will wind up and tear down worker
     19   // threads. Ensure there are no issues in that logic.
     20   for (int i = 0; i < 16; ++i) {
     21     ThreadPool tp(i);
     22   }
     23 }
     24 
     25 
     26 static void test_parallelism(bool allow_spinning)
     27 {
     28   // Test we never-ever fail to match available tasks with idle threads.
     29   const int kThreads = 16;  // code below expects that this is a multiple of 4
     30   ThreadPool tp(kThreads, allow_spinning);
     31   VERIFY_IS_EQUAL(tp.NumThreads(), kThreads);
     32   VERIFY_IS_EQUAL(tp.CurrentThreadId(), -1);
     33   for (int iter = 0; iter < 100; ++iter) {
     34     std::atomic<int> running(0);
     35     std::atomic<int> done(0);
     36     std::atomic<int> phase(0);
     37     // Schedule kThreads tasks and ensure that they all are running.
     38     for (int i = 0; i < kThreads; ++i) {
     39       tp.Schedule([&]() {
     40         const int thread_id = tp.CurrentThreadId();
     41         VERIFY_GE(thread_id, 0);
     42         VERIFY_LE(thread_id, kThreads - 1);
     43         running++;
     44         while (phase < 1) {
     45         }
     46         done++;
     47       });
     48     }
     49     while (running != kThreads) {
     50     }
     51     running = 0;
     52     phase = 1;
     53     // Now, while the previous tasks exit, schedule another kThreads tasks and
     54     // ensure that they are running.
     55     for (int i = 0; i < kThreads; ++i) {
     56       tp.Schedule([&, i]() {
     57         running++;
     58         while (phase < 2) {
     59         }
     60         // When all tasks are running, half of tasks exit, quarter of tasks
     61         // continue running and quarter of tasks schedule another 2 tasks each.
     62         // Concurrently main thread schedules another quarter of tasks.
     63         // This gives us another kThreads tasks and we ensure that they all
     64         // are running.
     65         if (i < kThreads / 2) {
     66         } else if (i < 3 * kThreads / 4) {
     67           running++;
     68           while (phase < 3) {
     69           }
     70           done++;
     71         } else {
     72           for (int j = 0; j < 2; ++j) {
     73             tp.Schedule([&]() {
     74               running++;
     75               while (phase < 3) {
     76               }
     77               done++;
     78             });
     79           }
     80         }
     81         done++;
     82       });
     83     }
     84     while (running != kThreads) {
     85     }
     86     running = 0;
     87     phase = 2;
     88     for (int i = 0; i < kThreads / 4; ++i) {
     89       tp.Schedule([&]() {
     90         running++;
     91         while (phase < 3) {
     92         }
     93         done++;
     94       });
     95     }
     96     while (running != kThreads) {
     97     }
     98     phase = 3;
     99     while (done != 3 * kThreads) {
    100     }
    101   }
    102 }
    103 
    104 
    105 static void test_cancel()
    106 {
    107   ThreadPool tp(2);
    108 
    109   // Schedule a large number of closure that each sleeps for one second. This
    110   // will keep the thread pool busy for much longer than the default test timeout.
    111   for (int i = 0; i < 1000; ++i) {
    112     tp.Schedule([]() {
    113       std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    114     });
    115   }
    116 
    117   // Cancel the processing of all the closures that are still pending.
    118   tp.Cancel();
    119 }
    120 
    121 static void test_pool_partitions() {
    122   const int kThreads = 2;
    123   ThreadPool tp(kThreads);
    124 
    125   // Assign each thread to its own partition, so that stealing other work only
    126   // occurs globally when a thread is idle.
    127   std::vector<std::pair<unsigned, unsigned>> steal_partitions(kThreads);
    128   for (int i = 0; i < kThreads; ++i) {
    129     steal_partitions[i] = std::make_pair(i, i + 1);
    130   }
    131   tp.SetStealPartitions(steal_partitions);
    132 
    133   std::atomic<int> running(0);
    134   std::atomic<int> done(0);
    135   std::atomic<int> phase(0);
    136 
    137   // Schedule kThreads tasks and ensure that they all are running.
    138   for (int i = 0; i < kThreads; ++i) {
    139     tp.Schedule([&]() {
    140       const int thread_id = tp.CurrentThreadId();
    141       VERIFY_GE(thread_id, 0);
    142       VERIFY_LE(thread_id, kThreads - 1);
    143       ++running;
    144       while (phase < 1) {
    145       }
    146       ++done;
    147     });
    148   }
    149   while (running != kThreads) {
    150   }
    151   // Schedule each closure to only run on thread 'i' and verify that it does.
    152   for (int i = 0; i < kThreads; ++i) {
    153     tp.ScheduleWithHint(
    154         [&, i]() {
    155           ++running;
    156           const int thread_id = tp.CurrentThreadId();
    157           VERIFY_IS_EQUAL(thread_id, i);
    158           while (phase < 2) {
    159           }
    160           ++done;
    161         },
    162         i, i + 1);
    163   }
    164   running = 0;
    165   phase = 1;
    166   while (running != kThreads) {
    167   }
    168   running = 0;
    169   phase = 2;
    170 }
    171 
    172 
    173 EIGEN_DECLARE_TEST(cxx11_non_blocking_thread_pool)
    174 {
    175   CALL_SUBTEST(test_create_destroy_empty_pool());
    176   CALL_SUBTEST(test_parallelism(true));
    177   CALL_SUBTEST(test_parallelism(false));
    178   CALL_SUBTEST(test_cancel());
    179   CALL_SUBTEST(test_pool_partitions());
    180 }