You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
6.7 KiB
C++

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
#define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <type_traits>
#include <utility>
#include <boost/assert.hpp>
#include <boost/config.hpp>
#include <boost/fiber/detail/config.hpp>
#include <boost/fiber/context.hpp>
// David Chase and Yossi Lev. Dynamic circular work-stealing deque.
// In SPAA 05: Proceedings of the seventeenth annual ACM symposium
// on Parallelism in algorithms and architectures, pages 2128,
// New York, NY, USA, 2005. ACM.
//
// Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
// Correct and efficient work-stealing for weak memory models.
// In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
// of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
#if BOOST_COMP_CLANG
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-private-field"
#endif
namespace boost {
namespace fibers {
namespace detail {
class context_spmc_queue {
private:
class array {
private:
typedef std::atomic< context * > atomic_type;
typedef atomic_type storage_type;
std::size_t capacity_;
storage_type * storage_;
public:
array( std::size_t capacity) :
capacity_{ capacity },
storage_{ new storage_type[capacity_] } {
for ( std::size_t i = 0; i < capacity_; ++i) {
::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
}
}
~array() {
for ( std::size_t i = 0; i < capacity_; ++i) {
reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
}
delete [] storage_;
}
std::size_t capacity() const noexcept {
return capacity_;
}
void push( std::size_t bottom, context * ctx) noexcept {
reinterpret_cast< atomic_type * >(
std::addressof( storage_[bottom % capacity_]) )
->store( ctx, std::memory_order_relaxed);
}
context * pop( std::size_t top) noexcept {
return reinterpret_cast< atomic_type * >(
std::addressof( storage_[top % capacity_]) )
->load( std::memory_order_relaxed);
}
array * resize( std::size_t bottom, std::size_t top) {
std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
for ( std::size_t i = top; i != bottom; ++i) {
tmp->push( i, pop( i) );
}
return tmp.release();
}
};
std::atomic< std::size_t > top_{ 0 };
std::atomic< std::size_t > bottom_{ 0 };
std::atomic< array * > array_;
std::vector< array * > old_arrays_{};
char padding_[cacheline_length];
public:
context_spmc_queue( std::size_t capacity = 4096) :
array_{ new array{ capacity } } {
old_arrays_.reserve( 32);
}
~context_spmc_queue() {
for ( array * a : old_arrays_) {
delete a;
}
delete array_.load();
}
context_spmc_queue( context_spmc_queue const&) = delete;
context_spmc_queue & operator=( context_spmc_queue const&) = delete;
bool empty() const noexcept {
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
std::size_t top = top_.load( std::memory_order_relaxed);
return bottom <= top;
}
void push( context * ctx) {
std::size_t bottom = bottom_.load( std::memory_order_relaxed);
std::size_t top = top_.load( std::memory_order_acquire);
array * a = array_.load( std::memory_order_relaxed);
if ( (a->capacity() - 1) < (bottom - top) ) {
// queue is full
// resize
array * tmp = a->resize( bottom, top);
old_arrays_.push_back( a);
std::swap( a, tmp);
array_.store( a, std::memory_order_relaxed);
}
a->push( bottom, ctx);
std::atomic_thread_fence( std::memory_order_release);
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
context * pop() {
std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
array * a = array_.load( std::memory_order_relaxed);
bottom_.store( bottom, std::memory_order_relaxed);
std::atomic_thread_fence( std::memory_order_seq_cst);
std::size_t top = top_.load( std::memory_order_relaxed);
context * ctx = nullptr;
if ( top <= bottom) {
// queue is not empty
ctx = a->pop( bottom);
BOOST_ASSERT( nullptr != ctx);
if ( top == bottom) {
// last element dequeued
if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
ctx = nullptr;
}
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
} else {
// queue is empty
bottom_.store( bottom + 1, std::memory_order_relaxed);
}
return ctx;
}
context * steal() {
std::size_t top = top_.load( std::memory_order_acquire);
std::atomic_thread_fence( std::memory_order_seq_cst);
std::size_t bottom = bottom_.load( std::memory_order_acquire);
context * ctx = nullptr;
if ( top < bottom) {
// queue is not empty
array * a = array_.load( std::memory_order_consume);
ctx = a->pop( top);
BOOST_ASSERT( nullptr != ctx);
// do not steal pinned context (e.g. main-/dispatcher-context)
if ( ctx->is_context( type::pinned_context) ) {
return nullptr;
}
if ( ! top_.compare_exchange_strong( top, top + 1,
std::memory_order_seq_cst,
std::memory_order_relaxed) ) {
// lose the race
return nullptr;
}
}
return ctx;
}
};
}}}
#if BOOST_COMP_CLANG
#pragma clang diagnostic pop
#endif
#endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H