C ++ 11: How to implement fast, easy and fair synchronized access to resources

Question

What can I do to get a blocking mechanism that provides a minimal and stable delay, ensuring that a thread cannot reload a resource before another thread acquires and releases it?

The desirability of answers to this question is evaluated as follows:

  • Some combination of built-in features of C ++ 11, working in the MinGW in Windows 7 (note that library <thread>and <mutex>do not work on the Windows platform)

  • Some combination of Windows API functions

  • FairLock modification below, my own attempt to implement such a mechanism

  • Some features provided by the free open source library that does not require the installation process .configure / make / make install (getting this to work in MSYS is more of an adventure than I care)

Background

I am writing an application that is effectively a multi-stage producer / consumer. One thread generates the input consumed by another thread, which produces the output consumed by another thread. The application uses pairs of buffers, so that after the initial delay, all threads can work almost simultaneously.

Windows 7, CriticalSections . CriticalSections (, , Windows ++ 11) , - , , , . - (Encoder) , . , .

, , , , , - . ( ). FairLock Encoder , , CriticalSection, , 60% . 40% 10 100 , .

FairLock

// FairLock.hpp
#ifndef FAIRLOCK_HPP
#define FAIRLOCK_HPP
#include <atomic>
using namespace std;
class FairLock {
    private:
        atomic_bool owned {false};
        atomic<DWORD> lastOwner {0};
    public:
        FairLock(bool owned);
        bool inline hasLock() const;
        bool tryLock();
        void seizeLock();
        void tryRelease();
        void waitForLock();
};
#endif

// FairLock.cpp
#include <windows.h>
#include "FairLock.hpp"
#define ID GetCurrentThreadId()

FairLock::FairLock(bool owned) {
    if (owned) {
        this->owned = true;
        this->lastOwner = ID;
    } else {
        this->owned = false;
        this->lastOwner = 0;
    }
}

bool inline FairLock::hasLock() const {
    return owned && lastOwner == ID;
}

bool FairLock::tryLock() {
    bool success = false;
    DWORD id = ID;
    if (owned) {
        success = lastOwner == id;
    } else if (
        lastOwner != id &&
        owned.compare_exchange_strong(success, true)
    ) {
        lastOwner = id;
        success = true;
    } else {
        success = false;
    }
    return success;
}

void FairLock::seizeLock() {
    bool success = false;
    DWORD id = ID;
    if (!(owned && lastOwner == id)) {
        while (!owned.compare_exchange_strong(success, true)) {
            success = false;
        }
        lastOwner = id;
    }
}

void FairLock::tryRelease() {
    if (hasLock()) {
        owned = false;
    }
}

void FairLock::waitForLock() {
    bool success = false;
    DWORD id = ID;
    if (!(owned && lastOwner == id)) {
        while (lastOwner == id); // spin
        while (!owned.compare_exchange_strong(success, true)) {
            success = false;
        }
        lastOwner = id;
    }
}

FairLock ; !

, ++. . CouchDeveloper . , , , FairLock, , , . , , :

New owner: set owned to true
Old owner: is owned true?  yes
Old owner: am I the last owner? yes
New owner: set me as the last owner

.

, . , .

+3
3

:

*) " " .

: , GCD (Grand Central Dispatch):

using gcd::mutex;
using gcd::semaphore;


// A blocking queue in which each put must wait for a get, and vice 
// versa. A synchronous queue does not have any internal capacity, 
// not even a capacity of one. 

template <typename T>
class simple_synchronous_queue {
public:

    typedef T value_type;

    enum result_type {
        OK = 0,
        TIMEOUT_NOT_DELIVERED = -1,
        TIMEOUT_NOT_PICKED = -2,
        TIMEOUT_NOTHING_OFFERED = -3
    };

    simple_synchronous_queue() 
    : sync_(0), send_(1), recv_(0)
    {
    }

    void put(const T& v) {
        send_.wait();
        new (address()) T(v);
        recv_.signal();
        sync_.wait();
    }

    result_type put(const T& v, double timeout) {
        if (send_.wait(timeout)) {
            new (storage_) T(v);
            recv_.signal();
            if (sync_.wait(timeout)) {
                return OK;
            }
            else {
                return TIMEOUT_NOT_PICKED;
            }
        }
        else {
            return TIMEOUT_NOT_DELIVERED;
        }        
    }

    T get() {
        recv_.wait();
        T result = *address();
        address()->~T();
        sync_.signal();
        send_.signal();
        return result;
    }

    std::pair<result_type, T> get(double timeout) {
        if (recv_.wait(timeout)) {
            std::pair<result_type, T> result = 
                std::pair<result_type, T>(OK, *address());
            address()->~T();
            sync_.signal();
            send_.signal();
            return result;
        }
        else {
            return std::pair<result_type, T>(TIMEOUT_NOTHING_OFFERED, T());
        }
    }    

private:
    using storage_t = typename std::aligned_storage<sizeof(T), std::alignment_of<T>::value>::type;

    T* address() return static_cast<T*>(static_cast<void*>(&storage_));
    }

    storage_t   storage_;
    semaphore   sync_;
    semaphore   send_;
    semaphore   recv_;
};

*) : , .....;)

+1

++ 11, condition_variable -per- thread, , , (- Coliru):

class FairMutex {
private:
  class waitnode {
    std::condition_variable cv_;
    waitnode* next_ = nullptr;
    FairMutex& fmtx_;
  public:
    waitnode(FairMutex& fmtx) : fmtx_(fmtx) {
      *fmtx.tail_ = this;
      fmtx.tail_ = &next_;
    }

    ~waitnode() {
      for (waitnode** p = &fmtx_.waiters_; *p; p = &(*p)->next_) {
        if (*p == this) {
          *p = next_;
          if (!next_) {
            fmtx_.tail_ = &fmtx_.waiters_;
          }
          break;
        }
      }
    }

    void wait(std::unique_lock<std::mutex>& lk) {
      while (fmtx_.held_ || fmtx_.waiters_ != this) {
        cv_.wait(lk);
      }
    }

    void notify() {
      cv_.notify_one();
    }
  };

  waitnode* waiters_ = nullptr;
  waitnode** tail_ = &waiters_;
  std::mutex mtx_;
  bool held_ = false;

public:
  void lock() {
    auto lk = std::unique_lock<std::mutex>{mtx_};
    if (held_ || waiters_) {
      waitnode{*this}.wait(lk);
    }
    held_ = true;
  }

  bool try_lock() {
    if (mtx_.try_lock()) {
      std::lock_guard<std::mutex> lk(mtx_, std::adopt_lock);
      if (!held_ && !waiters_) {
        held_ = true;
        return true;
      }
    }
    return false;
  }

  void unlock() {
    std::lock_guard<std::mutex> lk(mtx_);
    held_ = false;
    if (waiters_ != nullptr) {
      waiters_->notify();
    }
  }
};

FairMutex Lockable, . , , .

+4

I accepted the answer of CouchDeveloper, as it showed me the correct path. I wrote a Windows-specific implementation of the synchronous queue in C ++ 11 and added this answer so that others can consider / use it if they want to.

// SynchronousQueue.hpp
#ifndef SYNCHRONOUSQUEUE_HPP
#define SYNCHRONOUSQUEUE_HPP

#include <atomic>
#include <exception>
#include <windows>

using namespace std;

class CouldNotEnterException: public exception {};
class NoPairedCallException: public exception {};

template typename<T>
class SynchronousQueue {
    private:
        atomic_bool valueReady {false};

        CRITICAL_SECTION getCriticalSection;
        CRITICAL_SECTION putCriticalSection;

        DWORD wait {0};

        HANDLE getSemaphore;
        HANDLE putSemaphore;

        const T* address {nullptr};

    public:
        SynchronousQueue(DWORD waitMS): wait {waitMS}, address {nullptr} {
            initializeCriticalSection(&getCriticalSection);
            initializeCriticalSection(&putCriticalSection);

            getSemaphore = CreateSemaphore(nullptr, 0, 1, nullptr);
            putSemaphore = CreateSemaphore(nullptr, 0, 1, nullptr);
        }

        ~SynchronousQueue() {
            EnterCriticalSection(&getCriticalSection);
            EnterCriticalSection(&putCriticalSection);

            CloseHandle(getSemaphore);
            CloseHandle(putSemaphore);

            DeleteCriticalSection(&putCriticalSection);
            DeleteCriticalSection(&getCriticalSection);
        }

        void put(const T& value) {
            if (!TryEnterCriticalSection(&putCriticalSection)) {
                throw CouldNotEnterException();
            }

            ReleaseSemaphore(putSemaphore, (LONG) 1, nullptr);

            if (WaitForSingleObject(getSemaphore, wait) != WAIT_OBJECT_0) {
                if (WaitForSingleObject(putSemaphore, 0) == WAIT_OBJECT_0) {
                    LeaveCriticalSection(&putCriticalSection);
                    throw NoPairedCallException();
                } else {
                    WaitForSingleObject(getSemaphore, 0);
                }
            }

            address = &value;
            valueReady = true;
            while (valueReady);

            LeaveCriticalSection(&putCriticalSection);
        }

        T get() {
            if (!TryEnterCriticalSection(&getCriticalSection)) {
                throw CouldNotEnterException();
            }

            ReleaseSemaphore(getSemaphore, (LONG) 1, nullptr);

            if (WaitForSingleObject(putSemaphore, wait) != WAIT_OBJECT_0) {
                if (WaitForSingleObject(getSemaphore, 0) == WAIT_OBJECT_0) {
                    LeaveCriticalSection(&getCriticalSection);
                    throw NoPairedCallException();
                } else {
                    WaitForSingleObject(putSemaphore, 0);
                }
            }

            while (!valueReady);
            T toReturn = *address;
            valueReady = false;

            LeaveCriticalSection(&getCriticalSection);

            return toReturn;
        }
};

#endif
0
source

All Articles