TensorDeviceThreadPool.h (15203B)
1 // This file is part of Eigen, a lightweight C++ template library 2 // for linear algebra. 3 // 4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog@gmail.com> 5 // 6 // This Source Code Form is subject to the terms of the Mozilla 7 // Public License v. 2.0. If a copy of the MPL was not distributed 8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 9 10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 12 13 namespace Eigen { 14 15 // Runs an arbitrary function and then calls Notify() on the passed in 16 // Notification. 17 template <typename Function, typename... Args> struct FunctionWrapperWithNotification 18 { 19 static void run(Notification* n, Function f, Args... args) { 20 f(args...); 21 if (n) { 22 n->Notify(); 23 } 24 } 25 }; 26 27 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier 28 { 29 static void run(Barrier* b, Function f, Args... args) { 30 f(args...); 31 if (b) { 32 b->Notify(); 33 } 34 } 35 }; 36 37 template <typename SyncType> 38 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { 39 if (n) { 40 n->Wait(); 41 } 42 } 43 44 // An abstract interface to a device specific memory allocator. 45 class Allocator { 46 public: 47 virtual ~Allocator() {} 48 virtual void* allocate(size_t num_bytes) const = 0; 49 virtual void deallocate(void* buffer) const = 0; 50 }; 51 52 // Build a thread pool device on top the an existing pool of threads. 53 struct ThreadPoolDevice { 54 // The ownership of the thread pool remains with the caller. 55 ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores, Allocator* allocator = nullptr) 56 : pool_(pool), num_threads_(num_cores), allocator_(allocator) { } 57 58 EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { 59 return allocator_ ? allocator_->allocate(num_bytes) 60 : internal::aligned_malloc(num_bytes); 61 } 62 63 EIGEN_STRONG_INLINE void deallocate(void* buffer) const { 64 if (allocator_) { 65 allocator_->deallocate(buffer); 66 } else { 67 internal::aligned_free(buffer); 68 } 69 } 70 71 EIGEN_STRONG_INLINE void* allocate_temp(size_t num_bytes) const { 72 return allocate(num_bytes); 73 } 74 75 EIGEN_STRONG_INLINE void deallocate_temp(void* buffer) const { 76 deallocate(buffer); 77 } 78 79 template<typename Type> 80 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE Type get(Type data) const { 81 return data; 82 } 83 84 EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const { 85 #ifdef __ANDROID__ 86 ::memcpy(dst, src, n); 87 #else 88 // TODO(rmlarsen): Align blocks on cache lines. 89 // We have observed that going beyond 4 threads usually just wastes 90 // CPU cycles due to the threads competing for memory bandwidth, so we 91 // statically schedule at most 4 block copies here. 92 const size_t kMinBlockSize = 32768; 93 const size_t num_threads = CostModel::numThreads(n, TensorOpCost(1.0, 1.0, 0), 4); 94 if (n <= kMinBlockSize || num_threads < 2) { 95 ::memcpy(dst, src, n); 96 } else { 97 const char* src_ptr = static_cast<const char*>(src); 98 char* dst_ptr = static_cast<char*>(dst); 99 const size_t blocksize = (n + (num_threads - 1)) / num_threads; 100 Barrier barrier(static_cast<int>(num_threads - 1)); 101 // Launch the last 3 blocks on worker threads. 102 for (size_t i = 1; i < num_threads; ++i) { 103 enqueue_with_barrier(&barrier, [n, i, src_ptr, dst_ptr, blocksize] { 104 ::memcpy(dst_ptr + i * blocksize, src_ptr + i * blocksize, 105 numext::mini(blocksize, n - (i * blocksize))); 106 }); 107 } 108 // Launch the first block on the main thread. 109 ::memcpy(dst_ptr, src_ptr, blocksize); 110 barrier.Wait(); 111 } 112 #endif 113 } 114 EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { 115 memcpy(dst, src, n); 116 } 117 EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { 118 memcpy(dst, src, n); 119 } 120 121 EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { 122 ::memset(buffer, c, n); 123 } 124 125 EIGEN_STRONG_INLINE int numThreads() const { 126 return num_threads_; 127 } 128 129 // Number of theads available in the underlying thread pool. This number can 130 // be different from the value returned by numThreads(). 131 EIGEN_STRONG_INLINE int numThreadsInPool() const { 132 return pool_->NumThreads(); 133 } 134 135 EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { 136 return l1CacheSize(); 137 } 138 139 EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const { 140 // The l3 cache size is shared between all the cores. 141 return l3CacheSize() / num_threads_; 142 } 143 144 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const { 145 // Should return an enum that encodes the ISA supported by the CPU 146 return 1; 147 } 148 149 template <class Function, class... Args> 150 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, 151 Args&&... args) const { 152 Notification* n = new Notification(); 153 pool_->Schedule( 154 std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, 155 std::move(f), args...)); 156 return n; 157 } 158 159 template <class Function, class... Args> 160 EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, Function&& f, 161 Args&&... args) const { 162 pool_->Schedule( 163 std::bind(&FunctionWrapperWithBarrier<Function, Args...>::run, b, 164 std::move(f), args...)); 165 } 166 167 template <class Function, class... Args> 168 EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, 169 Args&&... args) const { 170 if (sizeof...(args) > 0) { 171 pool_->Schedule(std::bind(std::move(f), args...)); 172 } else { 173 pool_->Schedule(std::move(f)); 174 } 175 } 176 177 // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if 178 // called from one of the threads in pool_. Returns -1 otherwise. 179 EIGEN_STRONG_INLINE int currentThreadId() const { 180 return pool_->CurrentThreadId(); 181 } 182 183 // WARNING: This function is synchronous and will block the calling thread. 184 // 185 // Synchronous parallelFor executes f with [0, n) arguments in parallel and 186 // waits for completion. F accepts a half-open interval [first, last). Block 187 // size is chosen based on the iteration cost and resulting parallel 188 // efficiency. If block_align is not nullptr, it is called to round up the 189 // block size. 190 void parallelFor(Index n, const TensorOpCost& cost, 191 std::function<Index(Index)> block_align, 192 std::function<void(Index, Index)> f) const { 193 if (EIGEN_PREDICT_FALSE(n <= 0)){ 194 return; 195 // Compute small problems directly in the caller thread. 196 } else if (n == 1 || numThreads() == 1 || 197 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { 198 f(0, n); 199 return; 200 } 201 202 // Compute block size and total count of blocks. 203 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); 204 205 // Recursively divide size into halves until we reach block_size. 206 // Division code rounds mid to block_size, so we are guaranteed to get 207 // block_count leaves that do actual computations. 208 Barrier barrier(static_cast<unsigned int>(block.count)); 209 std::function<void(Index, Index)> handleRange; 210 handleRange = [=, &handleRange, &barrier, &f](Index firstIdx, 211 Index lastIdx) { 212 while (lastIdx - firstIdx > block.size) { 213 // Split into halves and schedule the second half on a different thread. 214 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; 215 pool_->Schedule([=, &handleRange]() { handleRange(midIdx, lastIdx); }); 216 lastIdx = midIdx; 217 } 218 // Single block or less, execute directly. 219 f(firstIdx, lastIdx); 220 barrier.Notify(); 221 }; 222 223 if (block.count <= numThreads()) { 224 // Avoid a thread hop by running the root of the tree and one block on the 225 // main thread. 226 handleRange(0, n); 227 } else { 228 // Execute the root in the thread pool to avoid running work on more than 229 // numThreads() threads. 230 pool_->Schedule([=, &handleRange]() { handleRange(0, n); }); 231 } 232 233 barrier.Wait(); 234 } 235 236 // Convenience wrapper for parallelFor that does not align blocks. 237 void parallelFor(Index n, const TensorOpCost& cost, 238 std::function<void(Index, Index)> f) const { 239 parallelFor(n, cost, nullptr, std::move(f)); 240 } 241 242 // WARNING: This function is asynchronous and will not block the calling thread. 243 // 244 // Asynchronous parallelFor executes f with [0, n) arguments in parallel 245 // without waiting for completion. When the last block finished, it will call 246 // 'done' callback. F accepts a half-open interval [first, last). Block size 247 // is chosen based on the iteration cost and resulting parallel efficiency. If 248 // block_align is not nullptr, it is called to round up the block size. 249 void parallelForAsync(Index n, const TensorOpCost& cost, 250 std::function<Index(Index)> block_align, 251 std::function<void(Index, Index)> f, 252 std::function<void()> done) const { 253 // Compute small problems directly in the caller thread. 254 if (n <= 1 || numThreads() == 1 || 255 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { 256 f(0, n); 257 done(); 258 return; 259 } 260 261 // Compute block size and total count of blocks. 262 ParallelForBlock block = CalculateParallelForBlock(n, cost, block_align); 263 264 ParallelForAsyncContext* const ctx = 265 new ParallelForAsyncContext(block.count, std::move(f), std::move(done)); 266 267 // Recursively divide size into halves until we reach block_size. 268 // Division code rounds mid to block_size, so we are guaranteed to get 269 // block_count leaves that do actual computations. 270 ctx->handle_range = [this, ctx, block](Index firstIdx, Index lastIdx) { 271 while (lastIdx - firstIdx > block.size) { 272 // Split into halves and schedule the second half on a different thread. 273 const Index midIdx = firstIdx + divup((lastIdx - firstIdx) / 2, block.size) * block.size; 274 pool_->Schedule( 275 [ctx, midIdx, lastIdx]() { ctx->handle_range(midIdx, lastIdx); }); 276 lastIdx = midIdx; 277 } 278 279 // Single block or less, execute directly. 280 ctx->f(firstIdx, lastIdx); 281 282 // Delete async context if it was the last block. 283 if (ctx->count.fetch_sub(1) == 1) delete ctx; 284 }; 285 286 if (block.count <= numThreads()) { 287 // Avoid a thread hop by running the root of the tree and one block on the 288 // main thread. 289 ctx->handle_range(0, n); 290 } else { 291 // Execute the root in the thread pool to avoid running work on more than 292 // numThreads() threads. 293 pool_->Schedule([ctx, n]() { ctx->handle_range(0, n); }); 294 } 295 } 296 297 // Convenience wrapper for parallelForAsync that does not align blocks. 298 void parallelForAsync(Index n, const TensorOpCost& cost, 299 std::function<void(Index, Index)> f, 300 std::function<void()> done) const { 301 parallelForAsync(n, cost, nullptr, std::move(f), std::move(done)); 302 } 303 304 // Thread pool accessor. 305 ThreadPoolInterface* getPool() const { return pool_; } 306 307 // Allocator accessor. 308 Allocator* allocator() const { return allocator_; } 309 310 private: 311 typedef TensorCostModel<ThreadPoolDevice> CostModel; 312 313 // For parallelForAsync we must keep passed in closures on the heap, and 314 // delete them only after `done` callback finished. 315 struct ParallelForAsyncContext { 316 ParallelForAsyncContext(Index block_count, 317 std::function<void(Index, Index)> block_f, 318 std::function<void()> done_callback) 319 : count(block_count), 320 f(std::move(block_f)), 321 done(std::move(done_callback)) {} 322 ~ParallelForAsyncContext() { done(); } 323 324 std::atomic<Index> count; 325 std::function<void(Index, Index)> f; 326 std::function<void()> done; 327 328 std::function<void(Index, Index)> handle_range; 329 }; 330 331 struct ParallelForBlock { 332 Index size; // block size 333 Index count; // number of blocks 334 }; 335 336 // Calculates block size based on (1) the iteration cost and (2) parallel 337 // efficiency. We want blocks to be not too small to mitigate parallelization 338 // overheads; not too large to mitigate tail effect and potential load 339 // imbalance and we also want number of blocks to be evenly dividable across 340 // threads. 341 ParallelForBlock CalculateParallelForBlock( 342 const Index n, const TensorOpCost& cost, 343 std::function<Index(Index)> block_align) const { 344 const double block_size_f = 1.0 / CostModel::taskSize(1, cost); 345 const Index max_oversharding_factor = 4; 346 Index block_size = numext::mini( 347 n, numext::maxi<Index>( 348 divup<Index>(n, max_oversharding_factor * numThreads()), 349 block_size_f)); 350 const Index max_block_size = numext::mini(n, 2 * block_size); 351 352 if (block_align) { 353 Index new_block_size = block_align(block_size); 354 eigen_assert(new_block_size >= block_size); 355 block_size = numext::mini(n, new_block_size); 356 } 357 358 Index block_count = divup(n, block_size); 359 360 // Calculate parallel efficiency as fraction of total CPU time used for 361 // computations: 362 double max_efficiency = 363 static_cast<double>(block_count) / 364 (divup<int>(block_count, numThreads()) * numThreads()); 365 366 // Now try to increase block size up to max_block_size as long as it 367 // doesn't decrease parallel efficiency. 368 for (Index prev_block_count = block_count; 369 max_efficiency < 1.0 && prev_block_count > 1;) { 370 // This is the next block size that divides size into a smaller number 371 // of blocks than the current block_size. 372 Index coarser_block_size = divup(n, prev_block_count - 1); 373 if (block_align) { 374 Index new_block_size = block_align(coarser_block_size); 375 eigen_assert(new_block_size >= coarser_block_size); 376 coarser_block_size = numext::mini(n, new_block_size); 377 } 378 if (coarser_block_size > max_block_size) { 379 break; // Reached max block size. Stop. 380 } 381 // Recalculate parallel efficiency. 382 const Index coarser_block_count = divup(n, coarser_block_size); 383 eigen_assert(coarser_block_count < prev_block_count); 384 prev_block_count = coarser_block_count; 385 const double coarser_efficiency = 386 static_cast<double>(coarser_block_count) / 387 (divup<int>(coarser_block_count, numThreads()) * numThreads()); 388 if (coarser_efficiency + 0.01 >= max_efficiency) { 389 // Taking it. 390 block_size = coarser_block_size; 391 block_count = coarser_block_count; 392 if (max_efficiency < coarser_efficiency) { 393 max_efficiency = coarser_efficiency; 394 } 395 } 396 } 397 398 return {block_size, block_count}; 399 } 400 401 ThreadPoolInterface* pool_; 402 int num_threads_; 403 Allocator* allocator_; 404 }; 405 406 407 } // end namespace Eigen 408 409 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H