cart-elc

Source code for CART-ELC
git clone git://git.laack.co/cart-elc.git
Log | Files | Refs | README | LICENSE

cxx11_tensor_thread_local.cpp (4237B)


      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // This Source Code Form is subject to the terms of the Mozilla
      5 // Public License v. 2.0. If a copy of the MPL was not distributed
      6 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      7 
      8 #define EIGEN_USE_THREADS
      9 
     10 #include <iostream>
     11 #include <unordered_set>
     12 
     13 #include "main.h"
     14 #include <Eigen/CXX11/ThreadPool>
     15 
     16 struct Counter {
     17   Counter() = default;
     18 
     19   void inc() {
     20     // Check that mutation happens only in a thread that created this counter.
     21     VERIFY_IS_EQUAL(std::this_thread::get_id(), created_by);
     22     counter_value++;
     23   }
     24   int value() { return counter_value; }
     25 
     26   std::thread::id created_by;
     27   int counter_value = 0;
     28 };
     29 
     30 struct InitCounter {
     31   void operator()(Counter& counter) {
     32     counter.created_by = std::this_thread::get_id();
     33   }
     34 };
     35 
     36 void test_simple_thread_local() {
     37   int num_threads = internal::random<int>(4, 32);
     38   Eigen::ThreadPool thread_pool(num_threads);
     39   Eigen::ThreadLocal<Counter, InitCounter> counter(num_threads, InitCounter());
     40 
     41   int num_tasks = 3 * num_threads;
     42   Eigen::Barrier barrier(num_tasks);
     43 
     44   for (int i = 0; i < num_tasks; ++i) {
     45     thread_pool.Schedule([&counter, &barrier]() {
     46       Counter& local = counter.local();
     47       local.inc();
     48 
     49       std::this_thread::sleep_for(std::chrono::milliseconds(100));
     50       barrier.Notify();
     51     });
     52   }
     53 
     54   barrier.Wait();
     55 
     56   counter.ForEach(
     57       [](std::thread::id, Counter& cnt) { VERIFY_IS_EQUAL(cnt.value(), 3); });
     58 }
     59 
     60 void test_zero_sized_thread_local() {
     61   Eigen::ThreadLocal<Counter, InitCounter> counter(0, InitCounter());
     62 
     63   Counter& local = counter.local();
     64   local.inc();
     65 
     66   int total = 0;
     67   counter.ForEach([&total](std::thread::id, Counter& cnt) {
     68     total += cnt.value();
     69     VERIFY_IS_EQUAL(cnt.value(), 1);
     70   });
     71 
     72   VERIFY_IS_EQUAL(total, 1);
     73 }
     74 
     75 // All thread local values fits into the lock-free storage.
     76 void test_large_number_of_tasks_no_spill() {
     77   int num_threads = internal::random<int>(4, 32);
     78   Eigen::ThreadPool thread_pool(num_threads);
     79   Eigen::ThreadLocal<Counter, InitCounter> counter(num_threads, InitCounter());
     80 
     81   int num_tasks = 10000;
     82   Eigen::Barrier barrier(num_tasks);
     83 
     84   for (int i = 0; i < num_tasks; ++i) {
     85     thread_pool.Schedule([&counter, &barrier]() {
     86       Counter& local = counter.local();
     87       local.inc();
     88       barrier.Notify();
     89     });
     90   }
     91 
     92   barrier.Wait();
     93 
     94   int total = 0;
     95   std::unordered_set<std::thread::id> unique_threads;
     96 
     97   counter.ForEach([&](std::thread::id id, Counter& cnt) {
     98     total += cnt.value();
     99     unique_threads.insert(id);
    100   });
    101 
    102   VERIFY_IS_EQUAL(total, num_tasks);
    103   // Not all threads in a pool might be woken up to execute submitted tasks.
    104   // Also thread_pool.Schedule() might use current thread if queue is full.
    105   VERIFY_IS_EQUAL(
    106       unique_threads.size() <= (static_cast<size_t>(num_threads + 1)), true);
    107 }
    108 
    109 // Lock free thread local storage is too small to fit all the unique threads,
    110 // and it spills to a map guarded by a mutex.
    111 void test_large_number_of_tasks_with_spill() {
    112   int num_threads = internal::random<int>(4, 32);
    113   Eigen::ThreadPool thread_pool(num_threads);
    114   Eigen::ThreadLocal<Counter, InitCounter> counter(1, InitCounter());
    115 
    116   int num_tasks = 10000;
    117   Eigen::Barrier barrier(num_tasks);
    118 
    119   for (int i = 0; i < num_tasks; ++i) {
    120     thread_pool.Schedule([&counter, &barrier]() {
    121       Counter& local = counter.local();
    122       local.inc();
    123       barrier.Notify();
    124     });
    125   }
    126 
    127   barrier.Wait();
    128 
    129   int total = 0;
    130   std::unordered_set<std::thread::id> unique_threads;
    131 
    132   counter.ForEach([&](std::thread::id id, Counter& cnt) {
    133     total += cnt.value();
    134     unique_threads.insert(id);
    135   });
    136 
    137   VERIFY_IS_EQUAL(total, num_tasks);
    138   // Not all threads in a pool might be woken up to execute submitted tasks.
    139   // Also thread_pool.Schedule() might use current thread if queue is full.
    140   VERIFY_IS_EQUAL(
    141       unique_threads.size() <= (static_cast<size_t>(num_threads + 1)), true);
    142 }
    143 
    144 EIGEN_DECLARE_TEST(cxx11_tensor_thread_local) {
    145   CALL_SUBTEST(test_simple_thread_local());
    146   CALL_SUBTEST(test_zero_sized_thread_local());
    147   CALL_SUBTEST(test_large_number_of_tasks_no_spill());
    148   CALL_SUBTEST(test_large_number_of_tasks_with_spill());
    149 }