cart-elc

Source code for CART-ELC
git clone git://git.laack.co/cart-elc.git
Log | Files | Refs | README | LICENSE

TensorDeviceThreadPool.h (15203B)


      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com>
      5 //
      6 // This Source Code Form is subject to the terms of the Mozilla
      7 // Public License v. 2.0. If a copy of the MPL was not distributed
      8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      9 
     10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
     11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
     12 
     13 namespace Eigen {
     14 
     15 // Runs an arbitrary function and then calls Notify() on the passed in
     16 // Notification.
     17 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
     18 {
     19   static void run(Notification* n, Function f, Args... args) {
     20     f(args...);
     21     if (n) {
     22       n->Notify();
     23     }
     24   }
     25 };
     26 
     27 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
     28 {
     29   static void run(Barrier* b, Function f, Args... args) {
     30     f(args...);
     31     if (b) {
     32       b->Notify();
     33     }
     34   }
     35 };
     36 
     37 template <typename SyncType>
     38 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
     39   if (n) {
     40     n->Wait();
     41   }
     42 }
     43 
     44 // An abstract interface to a device specific memory allocator.
     45 class Allocator {
     46  public:
     47   virtual ~Allocator() {}
     48   virtual void* allocate(size_t num_bytes) const = 0;
     49   virtual void deallocate(void* buffer) const = 0;
     50 };
     51 
     52 // Build a thread pool device on top the an existing pool of threads.
     53 struct ThreadPoolDevice {
     54   // The ownership of the thread pool remains with the caller.
     55   ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr)
     56       : pool_(pool), num_threads_(num_cores), allocator_(allocator) { }
     57 
     58   EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
     59     return allocator_ ? allocator_->allocate(num_bytes)
     60         : internal::aligned_malloc(num_bytes);
     61   }
     62 
     63   EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
     64     if (allocator_) {
     65       allocator_->deallocate(buffer);
     66     } else {
     67       internal::aligned_free(buffer);
     68     }
     69   }
     70 
     71     EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const {
     72     return allocate(num_bytes);
     73   }
     74 
     75   EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const {
     76     deallocate(buffer);
     77   }
     78 
     79   template<typename Type>
     80   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const {
     81     return data;
     82   }
     83 
     84   EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
     85 #ifdef __ANDROID__
     86     ::memcpy(dst, src, n);
     87 #else
     88     // TODO(rmlarsen): Align blocks on cache lines.
     89     // We have observed that going beyond 4 threads usually just wastes
     90     // CPU cycles due to the threads competing for memory bandwidth, so we
     91     // statically schedule at most 4 block copies here.
     92     const size_t kMinBlockSize = 32768;
     93     const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4);
     94     if (n <= kMinBlockSize || num_threads < 2) {
     95       ::memcpy(dst, src, n);
     96     } else {
     97       const char* src_ptr = static_cast<const char*>(src);
     98       char* dst_ptr = static_cast<char*>(dst);
     99       const size_t blocksize = (n + (num_threads - 1)) / num_threads;
    100       Barrier barrier(static_cast<int>(num_threads - 1));
    101       // Launch the last 3 blocks on worker threads.
    102       for (size_t i = 1; i < num_threads; ++i) {
    103         enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] {
    104           ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize,
    105                    numext::mini(blocksize, n - (i * blocksize)));
    106         });
    107       }
    108       // Launch the first block on the main thread.
    109       ::memcpy(dst_ptr, src_ptr, blocksize);
    110       barrier.Wait();
    111     }
    112 #endif
    113   }
    114   EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
    115     memcpy(dst, src, n);
    116   }
    117   EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
    118     memcpy(dst, src, n);
    119   }
    120 
    121   EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
    122     ::memset(buffer, c, n);
    123   }
    124 
    125   EIGEN_STRONG_INLINE int numThreads() const {
    126     return num_threads_;
    127   }
    128 
    129   // Number of theads available in the underlying thread pool. This number can
    130   // be different from the value returned by numThreads().
    131   EIGEN_STRONG_INLINE int numThreadsInPool() const {
    132     return pool_->NumThreads();
    133   }
    134 
    135   EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
    136     return l1CacheSize();
    137   }
    138 
    139   EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
    140     // The l3 cache size is shared between all the cores.
    141     return l3CacheSize() / num_threads_;
    142   }
    143 
    144   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
    145     // Should return an enum that encodes the ISA supported by the CPU
    146     return 1;
    147   }
    148 
    149   template <class Function, class... Args>
    150   EIGEN_STRONG_INLINE Notification* enqueue(Function&& f,
    151                                             Args&&... args) const {
    152     Notification* n = new Notification();
    153     pool_->Schedule(
    154         std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n,
    155                   std::move(f), args...));
    156     return n;
    157   }
    158 
    159   template <class Function, class... Args>
    160   EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f,
    161                                                 Args&&... args) const {
    162     pool_->Schedule(
    163         std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b,
    164                   std::move(f), args...));
    165   }
    166 
    167   template <class Function, class... Args>
    168   EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f,
    169                                                  Args&&... args) const {
    170     if (sizeof...(args) > 0) {
    171       pool_->Schedule(std::bind(std::move(f), args...));
    172     } else {
    173       pool_->Schedule(std::move(f));
    174     }
    175   }
    176 
    177   // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
    178   // called from one of the threads in pool_. Returns -1 otherwise.
    179   EIGEN_STRONG_INLINE int currentThreadId() const {
    180     return pool_->CurrentThreadId();
    181   }
    182 
    183   // WARNING: This function is synchronous and will block the calling thread.
    184   //
    185   // Synchronous parallelFor executes f with [0, n) arguments in parallel and
    186   // waits for completion. F accepts a half-open interval [first, last). Block
    187   // size is chosen based on the iteration cost and resulting parallel
    188   // efficiency. If block_align is not nullptr, it is called to round up the
    189   // block size.
    190   void parallelFor(Index n, const TensorOpCost& cost,
    191                    std::function<Index(Index)> block_align,
    192                    std::function<void(Index, Index)> f) const {
    193     if (EIGEN_PREDICT_FALSE(n <= 0)){
    194       return;
    195     // Compute small problems directly in the caller thread.
    196     } else if (n == 1 || numThreads() == 1 ||
    197                CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
    198       f(0, n);
    199       return;
    200     }
    201 
    202     // Compute block size and total count of blocks.
    203     ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
    204 
    205     // Recursively divide size into halves until we reach block_size.
    206     // Division code rounds mid to block_size, so we are guaranteed to get
    207     // block_count leaves that do actual computations.
    208     Barrier barrier(static_cast<unsigned int>(block.count));
    209     std::function<void(Index, Index)> handleRange;
    210     handleRange = [=, &handleRange, &barrier, &f](Index firstIdx,
    211                                                   Index lastIdx) {
    212       while (lastIdx - firstIdx > block.size) {
    213         // Split into halves and schedule the second half on a different thread.
    214         const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
    215         pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); });
    216         lastIdx = midIdx;
    217       }
    218       // Single block or less, execute directly.
    219       f(firstIdx, lastIdx);
    220       barrier.Notify();
    221     };
    222 
    223     if (block.count <= numThreads()) {
    224       // Avoid a thread hop by running the root of the tree and one block on the
    225       // main thread.
    226       handleRange(0, n);
    227     } else {
    228       // Execute the root in the thread pool to avoid running work on more than
    229       // numThreads() threads.
    230       pool_->Schedule([=, &handleRange]() { handleRange(0, n); });
    231     }
    232 
    233     barrier.Wait();
    234   }
    235 
    236   // Convenience wrapper for parallelFor that does not align blocks.
    237   void parallelFor(Index n, const TensorOpCost& cost,
    238                    std::function<void(Index, Index)> f) const {
    239     parallelFor(n, cost, nullptr, std::move(f));
    240   }
    241 
    242   // WARNING: This function is asynchronous and will not block the calling thread.
    243   //
    244   // Asynchronous parallelFor executes f with [0, n) arguments in parallel
    245   // without waiting for completion. When the last block finished, it will call
    246   // 'done' callback. F accepts a half-open interval [first, last). Block size
    247   // is chosen based on the iteration cost and resulting parallel efficiency. If
    248   // block_align is not nullptr, it is called to round up the block size.
    249   void parallelForAsync(Index n, const TensorOpCost& cost,
    250                         std::function<Index(Index)> block_align,
    251                         std::function<void(Index, Index)> f,
    252                         std::function<void()> done) const {
    253     // Compute small problems directly in the caller thread.
    254     if (n <= 1 || numThreads() == 1 ||
    255         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
    256       f(0, n);
    257       done();
    258       return;
    259     }
    260 
    261     // Compute block size and total count of blocks.
    262     ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align);
    263 
    264     ParallelForAsyncContext* const ctx =
    265         new ParallelForAsyncContext(block.count, std::move(f), std::move(done));
    266 
    267     // Recursively divide size into halves until we reach block_size.
    268     // Division code rounds mid to block_size, so we are guaranteed to get
    269     // block_count leaves that do actual computations.
    270     ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) {
    271       while (lastIdx - firstIdx > block.size) {
    272         // Split into halves and schedule the second half on a different thread.
    273         const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size;
    274         pool_->Schedule(
    275             [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); });
    276         lastIdx = midIdx;
    277       }
    278 
    279       // Single block or less, execute directly.
    280       ctx->f(firstIdx, lastIdx);
    281 
    282       // Delete async context if it was the last block.
    283       if (ctx->count.fetch_sub(1) == 1) delete ctx;
    284     };
    285 
    286     if (block.count <= numThreads()) {
    287       // Avoid a thread hop by running the root of the tree and one block on the
    288       // main thread.
    289       ctx->handle_range(0, n);
    290     } else {
    291       // Execute the root in the thread pool to avoid running work on more than
    292       // numThreads() threads.
    293       pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); });
    294     }
    295   }
    296 
    297   // Convenience wrapper for parallelForAsync that does not align blocks.
    298   void parallelForAsync(Index n, const TensorOpCost& cost,
    299                         std::function<void(Index, Index)> f,
    300                         std::function<void()> done) const {
    301     parallelForAsync(n, cost, nullptr, std::move(f), std::move(done));
    302   }
    303 
    304   // Thread pool accessor.
    305   ThreadPoolInterface* getPool() const { return pool_; }
    306 
    307   // Allocator accessor.
    308   Allocator* allocator() const { return allocator_; }
    309 
    310  private:
    311   typedef TensorCostModel<ThreadPoolDevice> CostModel;
    312 
    313   // For parallelForAsync we must keep passed in closures on the heap, and
    314   // delete them only after `done` callback finished.
    315   struct ParallelForAsyncContext {
    316     ParallelForAsyncContext(Index block_count,
    317                             std::function<void(Index, Index)> block_f,
    318                             std::function<void()> done_callback)
    319         : count(block_count),
    320           f(std::move(block_f)),
    321           done(std::move(done_callback)) {}
    322     ~ParallelForAsyncContext() { done(); }
    323 
    324     std::atomic<Index> count;
    325     std::function<void(Index, Index)> f;
    326     std::function<void()> done;
    327 
    328     std::function<void(Index, Index)> handle_range;
    329   };
    330 
    331   struct ParallelForBlock {
    332     Index size;   // block size
    333     Index count;  // number of blocks
    334   };
    335 
    336   // Calculates block size based on (1) the iteration cost and (2) parallel
    337   // efficiency. We want blocks to be not too small to mitigate parallelization
    338   // overheads; not too large to mitigate tail effect and potential load
    339   // imbalance and we also want number of blocks to be evenly dividable across
    340   // threads.
    341   ParallelForBlock CalculateParallelForBlock(
    342       const Index n, const TensorOpCost& cost,
    343       std::function<Index(Index)> block_align) const {
    344     const double block_size_f = 1.0 / CostModel::taskSize(1, cost);
    345     const Index max_oversharding_factor = 4;
    346     Index block_size = numext::mini(
    347         n, numext::maxi<Index>(
    348                divup<Index>(n, max_oversharding_factor * numThreads()),
    349                block_size_f));
    350     const Index max_block_size = numext::mini(n, 2 * block_size);
    351 
    352     if (block_align) {
    353       Index new_block_size = block_align(block_size);
    354       eigen_assert(new_block_size >= block_size);
    355       block_size = numext::mini(n, new_block_size);
    356     }
    357 
    358     Index block_count = divup(n, block_size);
    359 
    360     // Calculate parallel efficiency as fraction of total CPU time used for
    361     // computations:
    362     double max_efficiency =
    363         static_cast<double>(block_count) /
    364         (divup<int>(block_count, numThreads()) * numThreads());
    365 
    366     // Now try to increase block size up to max_block_size as long as it
    367     // doesn't decrease parallel efficiency.
    368     for (Index prev_block_count = block_count;
    369          max_efficiency < 1.0 && prev_block_count > 1;) {
    370       // This is the next block size that divides size into a smaller number
    371       // of blocks than the current block_size.
    372       Index coarser_block_size = divup(n, prev_block_count - 1);
    373       if (block_align) {
    374         Index new_block_size = block_align(coarser_block_size);
    375         eigen_assert(new_block_size >= coarser_block_size);
    376         coarser_block_size = numext::mini(n, new_block_size);
    377       }
    378       if (coarser_block_size > max_block_size) {
    379         break;  // Reached max block size. Stop.
    380       }
    381       // Recalculate parallel efficiency.
    382       const Index coarser_block_count = divup(n, coarser_block_size);
    383       eigen_assert(coarser_block_count < prev_block_count);
    384       prev_block_count = coarser_block_count;
    385       const double coarser_efficiency =
    386           static_cast<double>(coarser_block_count) /
    387           (divup<int>(coarser_block_count, numThreads()) * numThreads());
    388       if (coarser_efficiency + 0.01 >= max_efficiency) {
    389         // Taking it.
    390         block_size = coarser_block_size;
    391         block_count = coarser_block_count;
    392         if (max_efficiency < coarser_efficiency) {
    393           max_efficiency = coarser_efficiency;
    394         }
    395       }
    396     }
    397 
    398     return {block_size, block_count};
    399   }
    400 
    401   ThreadPoolInterface* pool_;
    402   int num_threads_;
    403   Allocator* allocator_;
    404 };
    405 
    406 
    407 }  // end namespace Eigen
    408 
    409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H