cart-elc

Source code for CART-ELC
git clone git://git.laack.co/cart-elc.git
Log | Files | Refs | README | LICENSE

RunQueue.h (9366B)


      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
      5 //
      6 // This Source Code Form is subject to the terms of the Mozilla
      7 // Public License v. 2.0. If a copy of the MPL was not distributed
      8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      9 
     10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
     11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
     12 
     13 namespace Eigen {
     14 
     15 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
     16 // Operations on front of the queue must be done by a single thread (owner),
     17 // operations on back of the queue can be done by multiple threads concurrently.
     18 //
     19 // Algorithm outline:
     20 // All remote threads operating on the queue back are serialized by a mutex.
     21 // This ensures that at most two threads access state: owner and one remote
     22 // thread (Size aside). The algorithm ensures that the occupied region of the
     23 // underlying array is logically continuous (can wraparound, but no stray
     24 // occupied elements). Owner operates on one end of this region, remote thread
     25 // operates on the other end. Synchronization between these threads
     26 // (potential consumption of the last element and take up of the last empty
     27 // element) happens by means of state variable in each element. States are:
     28 // empty, busy (in process of insertion of removal) and ready. Threads claim
     29 // elements (empty->busy and ready->busy transitions) by means of a CAS
     30 // operation. The finishing transition (busy->empty and busy->ready) are done
     31 // with plain store as the element is exclusively owned by the current thread.
     32 //
     33 // Note: we could permit only pointers as elements, then we would not need
     34 // separate state variable as null/non-null pointer value would serve as state,
     35 // but that would require malloc/free per operation for large, complex values
     36 // (and this is designed to store std::function<()>).
     37 template <typename Work, unsigned kSize>
     38 class RunQueue {
     39  public:
     40   RunQueue() : front_(0), back_(0) {
     41     // require power-of-two for fast masking
     42     eigen_plain_assert((kSize & (kSize - 1)) == 0);
     43     eigen_plain_assert(kSize > 2);            // why would you do this?
     44     eigen_plain_assert(kSize <= (64 << 10));  // leave enough space for counter
     45     for (unsigned i = 0; i < kSize; i++)
     46       array_[i].state.store(kEmpty, std::memory_order_relaxed);
     47   }
     48 
     49   ~RunQueue() { eigen_plain_assert(Size() == 0); }
     50 
     51   // PushFront inserts w at the beginning of the queue.
     52   // If queue is full returns w, otherwise returns default-constructed Work.
     53   Work PushFront(Work w) {
     54     unsigned front = front_.load(std::memory_order_relaxed);
     55     Elem* e = &array_[front & kMask];
     56     uint8_t s = e->state.load(std::memory_order_relaxed);
     57     if (s != kEmpty ||
     58         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     59       return w;
     60     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
     61     e->w = std::move(w);
     62     e->state.store(kReady, std::memory_order_release);
     63     return Work();
     64   }
     65 
     66   // PopFront removes and returns the first element in the queue.
     67   // If the queue was empty returns default-constructed Work.
     68   Work PopFront() {
     69     unsigned front = front_.load(std::memory_order_relaxed);
     70     Elem* e = &array_[(front - 1) & kMask];
     71     uint8_t s = e->state.load(std::memory_order_relaxed);
     72     if (s != kReady ||
     73         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     74       return Work();
     75     Work w = std::move(e->w);
     76     e->state.store(kEmpty, std::memory_order_release);
     77     front = ((front - 1) & kMask2) | (front & ~kMask2);
     78     front_.store(front, std::memory_order_relaxed);
     79     return w;
     80   }
     81 
     82   // PushBack adds w at the end of the queue.
     83   // If queue is full returns w, otherwise returns default-constructed Work.
     84   Work PushBack(Work w) {
     85     std::unique_lock<std::mutex> lock(mutex_);
     86     unsigned back = back_.load(std::memory_order_relaxed);
     87     Elem* e = &array_[(back - 1) & kMask];
     88     uint8_t s = e->state.load(std::memory_order_relaxed);
     89     if (s != kEmpty ||
     90         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     91       return w;
     92     back = ((back - 1) & kMask2) | (back & ~kMask2);
     93     back_.store(back, std::memory_order_relaxed);
     94     e->w = std::move(w);
     95     e->state.store(kReady, std::memory_order_release);
     96     return Work();
     97   }
     98 
     99   // PopBack removes and returns the last elements in the queue.
    100   Work PopBack() {
    101     if (Empty()) return Work();
    102     std::unique_lock<std::mutex> lock(mutex_);
    103     unsigned back = back_.load(std::memory_order_relaxed);
    104     Elem* e = &array_[back & kMask];
    105     uint8_t s = e->state.load(std::memory_order_relaxed);
    106     if (s != kReady ||
    107         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
    108       return Work();
    109     Work w = std::move(e->w);
    110     e->state.store(kEmpty, std::memory_order_release);
    111     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
    112     return w;
    113   }
    114 
    115   // PopBackHalf removes and returns half last elements in the queue.
    116   // Returns number of elements removed.
    117   unsigned PopBackHalf(std::vector<Work>* result) {
    118     if (Empty()) return 0;
    119     std::unique_lock<std::mutex> lock(mutex_);
    120     unsigned back = back_.load(std::memory_order_relaxed);
    121     unsigned size = Size();
    122     unsigned mid = back;
    123     if (size > 1) mid = back + (size - 1) / 2;
    124     unsigned n = 0;
    125     unsigned start = 0;
    126     for (; static_cast<int>(mid - back) >= 0; mid--) {
    127       Elem* e = &array_[mid & kMask];
    128       uint8_t s = e->state.load(std::memory_order_relaxed);
    129       if (n == 0) {
    130         if (s != kReady || !e->state.compare_exchange_strong(
    131                                s, kBusy, std::memory_order_acquire))
    132           continue;
    133         start = mid;
    134       } else {
    135         // Note: no need to store temporal kBusy, we exclusively own these
    136         // elements.
    137         eigen_plain_assert(s == kReady);
    138       }
    139       result->push_back(std::move(e->w));
    140       e->state.store(kEmpty, std::memory_order_release);
    141       n++;
    142     }
    143     if (n != 0)
    144       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
    145     return n;
    146   }
    147 
    148   // Size returns current queue size.
    149   // Can be called by any thread at any time.
    150   unsigned Size() const { return SizeOrNotEmpty<true>(); }
    151 
    152   // Empty tests whether container is empty.
    153   // Can be called by any thread at any time.
    154   bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
    155 
    156   // Delete all the elements from the queue.
    157   void Flush() {
    158     while (!Empty()) {
    159       PopFront();
    160     }
    161   }
    162 
    163  private:
    164   static const unsigned kMask = kSize - 1;
    165   static const unsigned kMask2 = (kSize << 1) - 1;
    166   struct Elem {
    167     std::atomic<uint8_t> state;
    168     Work w;
    169   };
    170   enum {
    171     kEmpty,
    172     kBusy,
    173     kReady,
    174   };
    175   std::mutex mutex_;
    176   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
    177   // front/back, respectively. The remaining bits contain modification counters
    178   // that are incremented on Push operations. This allows us to (1) distinguish
    179   // between empty and full conditions (if we would use log(kSize) bits for
    180   // position, these conditions would be indistinguishable); (2) obtain
    181   // consistent snapshot of front_/back_ for Size operation using the
    182   // modification counters.
    183   std::atomic<unsigned> front_;
    184   std::atomic<unsigned> back_;
    185   Elem array_[kSize];
    186 
    187   // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
    188   // only whether the size is 0 is guaranteed to be correct.
    189   // Can be called by any thread at any time.
    190   template<bool NeedSizeEstimate>
    191   unsigned SizeOrNotEmpty() const {
    192     // Emptiness plays critical role in thread pool blocking. So we go to great
    193     // effort to not produce false positives (claim non-empty queue as empty).
    194     unsigned front = front_.load(std::memory_order_acquire);
    195     for (;;) {
    196       // Capture a consistent snapshot of front/tail.
    197       unsigned back = back_.load(std::memory_order_acquire);
    198       unsigned front1 = front_.load(std::memory_order_relaxed);
    199       if (front != front1) {
    200         front = front1;
    201         std::atomic_thread_fence(std::memory_order_acquire);
    202         continue;
    203       }
    204       if (NeedSizeEstimate) {
    205         return CalculateSize(front, back);
    206       } else {
    207         // This value will be 0 if the queue is empty, and undefined otherwise.
    208         unsigned maybe_zero = ((front ^ back) & kMask2);
    209         // Queue size estimate must agree with maybe zero check on the queue
    210         // empty/non-empty state.
    211         eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
    212         return maybe_zero;
    213       }
    214     }
    215   }
    216 
    217   EIGEN_ALWAYS_INLINE
    218   unsigned CalculateSize(unsigned front, unsigned back) const {
    219     int size = (front & kMask2) - (back & kMask2);
    220     // Fix overflow.
    221     if (size < 0) size += 2 * kSize;
    222     // Order of modification in push/pop is crafted to make the queue look
    223     // larger than it is during concurrent modifications. E.g. push can
    224     // increment size before the corresponding pop has decremented it.
    225     // So the computed size can be up to kSize + 1, fix it.
    226     if (size > static_cast<int>(kSize)) size = kSize;
    227     return static_cast<unsigned>(size);
    228   }
    229 
    230   RunQueue(const RunQueue&) = delete;
    231   void operator=(const RunQueue&) = delete;
    232 };
    233 
    234 }  // namespace Eigen
    235 
    236 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_