Synchronizer.h 6.49 KB
/*
 * Synchronizer.h
 *
 *  Created on: 15 gru 2018
 *      Author: mariuszo
 */

#ifndef SYNCHRONIZER_H_
#define SYNCHRONIZER_H_

#include <errno.h>
#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <map>
#include <mutex>
#include "wrflock.h"
#include "pthread_helpers.h"
#include "enums.h"
/*
#include <linux/limits.h>
#include <sys/mman.h>
#include <time.h>
#include <unistd.h>
#include <string>
#include <atomic>
#include <stdint.h>
#include <memory>

#include <iostream>

*/
#ifndef NS_IN_S
#define NS_IN_S 1000000000
#endif

struct syncSlot  {
	wrflock_t lock;
	off_t dataSlotID;
};

struct synchronizerData {
	sem_t lock;				//lock for data blocks access
	pthread_rwlock_t consumersLock;		//each consumer do rdlock to prevent producer finish prematurely
	std::atomic<off_t> writeSlotID;
};

class SynchronizerBase {
	static std::map<std::string, SynchronizerBase*> synchronizersMap;
	static std::mutex synchronizersMapLock;

	const std::string m_name;
	size_t m_slotsRingBufferSize;
	syncSlot* m_syncSlotsRingBuffer;
	synchronizerData* m_synchronizerData;
protected:
	SynchronizerBase(const std::string& name);
	virtual ~SynchronizerBase();

	void create();
	void attach();
	void destroy();

	inline void setWriteSlotID(const off_t& slotID) {
		m_synchronizerData->writeSlotID.store(slotID, std::memory_order_release);
	}

	inline const off_t getWriteSlotID() const {
		assert(m_synchronizerData->writeSlotID>=0);
		return m_synchronizerData->writeSlotID.load(std::memory_order_acquire);
	}

	bool consumersActiveLock(){
		
		return rdLock(m_synchronizerData->consumersLock,0)==0;
	}
	
	bool consumersActiveUnlock(){
		return rwUnlock(m_synchronizerData->consumersLock)==0;
	}
	
	bool consumersInactiveLock(const uint64_t& timeoutns=UINT64_MAX){
		return wrLock(m_synchronizerData->consumersLock, timeoutns)==0;
	}

	bool consumersInactiveUnlock(){
		return rwUnlock(m_synchronizerData->consumersLock)==0;
	}
public:
	inline const size_t& slotsRingBufferSize() const {
		return m_slotsRingBufferSize;
	}
	
	inline void setQueueSlot(const off_t& slotID, const off_t& blockID) noexcept {
		m_syncSlotsRingBuffer[slotID].dataSlotID=blockID;
	}

	inline syncSlot& getQueueSlot(const off_t& slotID) const  noexcept {
		return m_syncSlotsRingBuffer[slotID];
	}

	inline synchronizerData& getSynchronizerData() const  noexcept {
		return *m_synchronizerData;
	}

	inline void lockSynchronizerData() noexcept {
		semWait(m_synchronizerData->lock);
	}

	inline void unlockSynchronizerData() noexcept {
		semPost(m_synchronizerData->lock);
	}

	inline bool wacquire(const off_t& slotID) noexcept {
		return wrflock_wacquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}

	inline bool racquire(const off_t& slotID) noexcept {
		return wrflock_racquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}

	inline bool facquire(const off_t& slotID) noexcept {
		return wrflock_facquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}

	inline bool wrelease(const off_t& slotID) noexcept {
		return wrflock_wrelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}

	inline bool rrelease(const off_t& slotID) noexcept {
		return wrflock_rrelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}

	inline bool frelease(const off_t& slotID) noexcept {
		return wrflock_frelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
	}
	
	inline bool wwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
		if (timeoutns==UINT64_MAX){
			while (1) {
				if (wrflock_wwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			};
		} else if (timeoutns==0) {
			while(1){
				if (wrflock_wtrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					return false;
				}
				return true;
			}
		} else {
			timespec  time;
			clock_gettime(CLOCK_REALTIME, &time);
			time.tv_nsec+=timeoutns;
			time.tv_sec+=(time.tv_nsec/NS_IN_S);
			time.tv_nsec%=NS_IN_S;
			while (1) {
				if (wrflock_wtimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			}
		}
	}

	inline bool rwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
		if (timeoutns==UINT64_MAX){
			while (1) {
				if (wrflock_rwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			}
		} else if (timeoutns==0) {
			while(1){
				if (wrflock_rtrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					return false;
				}
				return true;
			}
		} else {
			timespec  time;
			clock_gettime(CLOCK_REALTIME, &time);
			time.tv_nsec+=timeoutns;
			time.tv_sec+=(time.tv_nsec/NS_IN_S);
			time.tv_nsec%=NS_IN_S;
			while (1) {
				if (wrflock_rtimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			}
		}
	}
	inline bool fwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
		if (timeoutns==UINT64_MAX){
			while (1) {
				if (wrflock_fwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			}
		} else if (timeoutns==0) {
			while(1){
				if (wrflock_ftrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
					return false;
				}
				return true;
			}
		} else {
			timespec  time;
			clock_gettime(CLOCK_REALTIME, &time);
			time.tv_nsec+=timeoutns;
			time.tv_sec+=(time.tv_nsec/NS_IN_S);
			time.tv_nsec%=NS_IN_S;
			while (1) {
				if (wrflock_ftimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
					if (errno==EINTR)
						continue;
					return false;
				}
				return true;
			}
		}
	}

};

template <access_type R> class Synchronizer {};

template <> class Synchronizer<access_type::access_master>: public SynchronizerBase {
public:
	Synchronizer(const std::string& name): SynchronizerBase(name){}
	virtual ~Synchronizer() {
		destroy();
	}
	using SynchronizerBase::create;
	using SynchronizerBase::destroy;
	using SynchronizerBase::setWriteSlotID;
	using SynchronizerBase::getWriteSlotID;
	using SynchronizerBase::consumersInactiveLock;
	using SynchronizerBase::consumersInactiveUnlock;
	using SynchronizerBase::getSynchronizerData;
};

template <> class Synchronizer<access_type::access_slave>: public SynchronizerBase {
public:
	Synchronizer(const std::string& name): SynchronizerBase(name){}
	virtual ~Synchronizer() {}

	using SynchronizerBase::attach;
	using SynchronizerBase::getWriteSlotID;
	using SynchronizerBase::consumersActiveLock;
	using SynchronizerBase::consumersActiveUnlock;
};


#endif /* SYNCHRONIZER_H_ */