Commit c341db021ef1e83afa092655b807bee4ed614255

Authored by Mariusz Orlikowski
0 parents

Initial commit

Makefile 0 → 100644
  1 +++ a/Makefile
  1 +all: test
  2 +
  3 +LIBS := -lpthread -lrt
  4 +
  5 +EXECUTABLES := test
  6 +
  7 +SRCS += \
  8 +src/Consumer.cpp \
  9 +src/DataBlock.cpp \
  10 +src/MemoryManager.cpp \
  11 +src/Producer.cpp \
  12 +src/Synchronizer.cpp \
  13 +src/shm.cpp \
  14 +src/wrflock_init.cpp \
  15 +src/wrflock_destroy.cpp \
  16 +src/wrflock_acquire.cpp \
  17 +src/wrflock_release.cpp \
  18 +src/wrflock_wait.cpp \
  19 +test.cpp
  20 +
  21 +OBJS += \
  22 +build/Consumer.o \
  23 +build/DataBlock.o \
  24 +build/MemoryManager.o \
  25 +build/Producer.o \
  26 +build/Synchronizer.o \
  27 +build/shm.o \
  28 +build/wrflock_init.o \
  29 +build/wrflock_destroy.o \
  30 +build/wrflock_acquire.o \
  31 +build/wrflock_release.o \
  32 +build/wrflock_wait.o \
  33 +build/test.o
  34 +
  35 +DEPS += \
  36 +build/Consumer.d \
  37 +build/DataBlock.d \
  38 +build/MemoryManager.d \
  39 +build/Producer.d \
  40 +build/Synchronizer.d \
  41 +build/shm.d \
  42 +build/wrflock_init.d \
  43 +build/wrflock_destroy.d \
  44 +build/wrflock_acquire.d \
  45 +build/wrflock_release.d \
  46 +build/wrflock_wait.d \
  47 +build/test.d
  48 +
  49 +-include $(DEPS)
  50 +
  51 +test: $(OBJS)
  52 + g++ -std=c++11 -o $@ $(OBJS) $(LIBS)
  53 +
  54 +build/%.o: src/%.cpp
  55 + @mkdir -p build
  56 + g++ -std=c++11 -DNDEBUG -O3 -Isrc -Wall -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
  57 +
  58 +build/%.o: %.cpp
  59 + @mkdir -p build
  60 + g++ -std=c++11 -DNDEBUG -O3 -Isrc -Wall -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<"
  61 +clean:
  62 + -rm $(EXECUTABLES) $(OBJS) $(DEPS)
  63 + -rmdir build
  64 +
  65 +.PHONY: all clean dependents
... ...
src/Consumer.cpp 0 → 100644
  1 +++ a/src/Consumer.cpp
  1 +/*
  2 + * Consumer.cpp
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#include "Consumer.h"
  9 +#include "ProducerOnInitLock.h"
  10 +
  11 +Consumer::Consumer(const std::string& name) : SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
  12 +}
  13 +
  14 +Consumer::~Consumer() {
  15 +}
  16 +
  17 +void Consumer::attach() {
  18 + ProducerOnInitLock::lockOnInit();
  19 + try {
  20 + m_memoryManagerData.attach();
  21 + m_synchronizerData.attach();
  22 + if (!m_synchronizerData.consumersActiveLock())
  23 + throw std::logic_error("Producer shut down activated");
  24 + }
  25 + catch (...) {
  26 + ProducerOnInitLock::unlockOnInit();
  27 + throw;
  28 + }
  29 + ProducerOnInitLock::unlockOnInit();
  30 +}
  31 +
  32 +void Consumer::init() {
  33 + m_synchronizerData.lockSynchronizerData();
  34 + m_readSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());
  35 + m_synchronizerData.racquire(m_readSlotID.val());
  36 + m_synchronizerData.unlockSynchronizerData();
  37 +}
  38 +void Consumer::done() {
  39 + m_synchronizerData.rrelease(m_readSlotID.val());
  40 + m_synchronizerData.consumersActiveUnlock();
  41 +}
  42 +
  43 +void Consumer::getDataBlock(DataBlock& dataBlock, const uint64_t& timeoutns) {
  44 + if (!m_synchronizerData.rwait(m_readSlotID.val(), timeoutns)){
  45 + dataBlock.setTimeout();
  46 + return;
  47 + }
  48 + if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop)) {
  49 + dataBlock.setStop();
  50 + return;
  51 + } else if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_empty)){
  52 + dataBlock.setEmpty();
  53 + return;
  54 + } else {
  55 + dataBlock=m_memoryManagerData.getDataBlock(m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID, m_readSlotID.val());
  56 + dataBlock.attach(this);
  57 + }
  58 + m_synchronizerData.racquire(++m_readSlotID);
  59 +}
  60 +void Consumer::commit(DataBlock& data) {
  61 + releaseDataBlock(data);
  62 + data.detach();
  63 + data.setEmpty();
  64 +}
  65 +
  66 +void Consumer::releaseDataBlock(const DataBlock& data) {
  67 + if (data){
  68 + m_synchronizerData.rrelease(data.syncSlotID());
  69 + }
  70 +}
... ...
src/Consumer.h 0 → 100644
  1 +++ a/src/Consumer.h
  1 +/*
  2 + * Consumer.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef CONSUMER_H_
  9 +#define CONSUMER_H_
  10 +
  11 +#include <string>
  12 +#include "SubsystemBase.h"
  13 +#include "MemoryManager.h"
  14 +#include "Synchronizer.h"
  15 +#include "ModuloCounter.h"
  16 +
  17 +class Consumer : public SubsystemBase {
  18 + MemoryManager<access_type::access_slave> m_memoryManagerData;
  19 + Synchronizer<access_type::access_slave> m_synchronizerData;
  20 + ModuloCounter<off_t> m_readSlotID;
  21 +
  22 +public:
  23 + Consumer(const std::string& name);
  24 + virtual ~Consumer();
  25 +
  26 + void attach();
  27 + void init();
  28 + void done();
  29 + void getDataBlock(DataBlock& dataBlock, const uint64_t& timeoutns = UINT64_MAX);
  30 + void commit(DataBlock& data);
  31 + void releaseDataBlock(const DataBlock& data);
  32 +
  33 +};
  34 +
  35 +#endif /* CONSUMER_H_ */
... ...
src/DataBlock.cpp 0 → 100644
  1 +++ a/src/DataBlock.cpp
  1 +/*
  2 + * producer.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#include "DataBlock.h"
  9 +#include "SubsystemBase.h"
  10 +
  11 +DataBlock::~DataBlock() {
  12 + if (m_attachedTo)
  13 + m_attachedTo->releaseDataBlock(*this);
  14 +}
  15 +
... ...
src/DataBlock.h 0 → 100644
  1 +++ a/src/DataBlock.h
  1 +/*
  2 + * producer.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef DATABLOCK_H_
  9 +#define DATABLOCK_H_
  10 +
  11 +#include <assert.h>
  12 +#include <atomic>
  13 +#include "enums.h"
  14 +/*
  15 +#include <unistd.h>
  16 +*/
  17 +class SubsystemBase;
  18 +
  19 +enum class block_type : off_t {
  20 + block_torelease=-1,
  21 + block_empty=-2
  22 +};
  23 +
  24 +struct dataSlot {
  25 + std::atomic<off_t> dataSlotID;
  26 + off_t dataAddr;
  27 + size_t dataSize;
  28 +};
  29 +
  30 +class DataBlock {
  31 + void* m_address;
  32 + dataSlot* m_dataSlot;
  33 + off_t m_syncSlotID;
  34 + SubsystemBase* m_attachedTo;
  35 +public:
  36 + DataBlock(const DataBlock& data):m_address(data.m_address), m_dataSlot(data.m_dataSlot), m_syncSlotID(data.m_syncSlotID), m_attachedTo(NULL) {}
  37 + DataBlock(const specialid_type& type=specialid_type::specialid_empty):m_address(NULL), m_dataSlot(NULL), m_syncSlotID(static_cast<off_t>(type)), m_attachedTo(NULL) {}
  38 + DataBlock(dataSlot& data, const void* baseAddress, const off_t& slotID=static_cast<off_t>(specialid_type::specialid_none)):m_address((baseAddress)?(uint8_t*)baseAddress+data.dataAddr:NULL), m_dataSlot(&data), m_syncSlotID(slotID), m_attachedTo(NULL){}
  39 + virtual ~DataBlock();
  40 +
  41 + void setType(const specialid_type& type){
  42 + m_address=m_dataSlot=NULL;
  43 + m_syncSlotID=static_cast<off_t>(type);
  44 + }
  45 + void setEmpty(){
  46 + m_address=m_dataSlot=NULL;
  47 + m_syncSlotID=static_cast<off_t>(specialid_type::specialid_empty);
  48 + }
  49 + void setStop(){
  50 + m_address=m_dataSlot=NULL;
  51 + m_syncSlotID=static_cast<off_t>(specialid_type::specialid_stop);
  52 + }
  53 + void setTimeout(){
  54 + m_address=m_dataSlot=NULL;
  55 + m_syncSlotID=static_cast<off_t>(specialid_type::specialid_timeout);
  56 + }
  57 + const void* operator *() const {
  58 + return m_address;
  59 + }
  60 + const size_t size() const {
  61 + if (m_dataSlot)
  62 + return m_dataSlot->dataSize;
  63 + else
  64 + return 0;
  65 + }
  66 + const off_t blockID() const {
  67 + assert(m_dataSlot && m_dataSlot->m_blockID>=0);
  68 + return m_dataSlot->dataSlotID.load(std::memory_order_acquire);
  69 + }
  70 + const off_t& syncSlotID() const {
  71 + return m_syncSlotID;
  72 + }
  73 + bool operator !() const {
  74 + return m_address==NULL || !m_dataSlot || m_dataSlot->dataSlotID<0;
  75 + }
  76 +
  77 + operator bool () const {
  78 + return !!(*this);
  79 + }
  80 +
  81 + bool isType(const specialid_type& type) const {
  82 + return m_syncSlotID==static_cast<off_t>(type);
  83 + }
  84 + bool isEmpty() const {
  85 + return m_syncSlotID==static_cast<off_t>(specialid_type::specialid_empty);
  86 + }
  87 + bool isStop() const {
  88 + return m_syncSlotID==static_cast<off_t>(specialid_type::specialid_stop);
  89 + }
  90 + bool isTimeout() const {
  91 + return m_syncSlotID==static_cast<off_t>(specialid_type::specialid_timeout);
  92 + }
  93 + bool isOutOfMemory() const {
  94 + return m_syncSlotID==static_cast<off_t>(specialid_type::specialid_outofmemory);
  95 + }
  96 + void attach(SubsystemBase* attachedTo) {m_attachedTo=attachedTo;}
  97 + void detach() {m_attachedTo=NULL;}
  98 +};
  99 +
  100 +#endif /* DATABLOCK_H_ */
... ...
src/MemoryManager.cpp 0 → 100644
  1 +++ a/src/MemoryManager.cpp
  1 +/*
  2 + * MemoryManager.cpp
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#include <limits.h>
  9 +#include "MemoryManager.h"
  10 +#include "pthread_helpers.h"
  11 +
  12 +
  13 +MemoryManagerBase::MemoryManagerBase(const std::string& name): m_name(name), m_shmDataRingBuffer(name+".ringbuffer"), m_shmMemoryManagementData(name+".memorymanagement"), m_dataRingBuffer(NULL), m_dataSlotsRingBuffer(NULL), m_memoryManagementData(NULL){
  14 +}
  15 +
  16 +MemoryManagerBase::~MemoryManagerBase(){
  17 +}
  18 +
  19 +void MemoryManagerBase::create(const size_t& dataRingBufferSize, const size_t& dataSlotsRingBufferSize) {
  20 + size_t size;
  21 +
  22 + if (dataSlotsRingBufferSize > SEM_VALUE_MAX)
  23 + throw std::logic_error("Data block queue size must not exceed "+std::to_string(SEM_VALUE_MAX));
  24 + if (dataSlotsRingBufferSize <= 0)
  25 + throw std::logic_error("Data block queue size must be positive value");
  26 +
  27 + m_dataSlotIDMask= 0xC000000000000000;
  28 + m_dataSlotAddrMask=0x3fffffffffffffff;
  29 + m_dataSlotIDShift=62;
  30 + while (dataSlotsRingBufferSize>(((uint64_t)1)<<(63-m_dataSlotIDShift))){
  31 + --m_dataSlotIDShift;
  32 + m_dataSlotIDMask=(m_dataSlotIDMask>>1)+0x8000000000000000;
  33 + m_dataSlotAddrMask=(m_dataSlotAddrMask>>1);
  34 + }
  35 + if (dataRingBufferSize>m_dataSlotAddrMask)
  36 + throw std::logic_error("Cannot manage addressing for current buffer size and block count");
  37 +
  38 +
  39 + if (!m_shmDataRingBuffer.alloc((size=dataRingBufferSize * sizeof(*m_dataRingBuffer))))
  40 + throw std::logic_error("Cannot allocate shared memory \""+m_shmDataRingBuffer.getName()+"\" size ("+std::to_string(size)+")");
  41 + if (!m_shmDataRingBuffer.mlock())
  42 + throw std::logic_error("Cannot mlock shared memory \""+m_shmDataRingBuffer.getName()+"\"");
  43 +
  44 + if (!m_shmMemoryManagementData.alloc((size=sizeof(*m_memoryManagementData) + dataSlotsRingBufferSize * sizeof(*m_dataSlotsRingBuffer))))
  45 + throw std::logic_error("Cannot allocate shared memory \""+m_shmMemoryManagementData.getName()+"\" (size "+std::to_string(size)+")");
  46 + if (!m_shmMemoryManagementData.mlock())
  47 + throw std::logic_error("Cannot mlock shared memory \""+m_shmMemoryManagementData.getName()+"\"");
  48 +
  49 + m_dataRingBuffer = (uint8_t*) (m_shmDataRingBuffer.getAddr());
  50 +
  51 + m_memoryManagementData = (memoryManagementData*) m_shmMemoryManagementData.getAddr();
  52 + m_memoryManagementData->dataRingBufferSize = m_shmDataRingBuffer.getSize();
  53 + m_memoryManagementData->dataSlotsRingBufferSize = dataSlotsRingBufferSize;
  54 +#ifdef FULLYPARALLEL
  55 + new (&(m_memoryManagementData->allocData)) std::atomic<uint64_t>(0);
  56 + new (&(m_memoryManagementData->releaseData)) std::atomic<uint64_t>(0);
  57 +#else
  58 + m_memoryManagementData->allocData=0;
  59 + m_memoryManagementData->releaseData=0;
  60 +#endif
  61 + new (&(m_memoryManagementData->freeSize)) std::atomic<size_t>(m_shmDataRingBuffer.getSize());
  62 + sem_init(&m_memoryManagementData->dataSlotsSemaphore, 0, m_memoryManagementData->dataSlotsRingBufferSize);
  63 +
  64 + m_dataSlotsRingBuffer = (dataSlot*) (m_memoryManagementData + 1);
  65 + for (size_t i = 0; i < m_memoryManagementData->dataSlotsRingBufferSize; ++i) {
  66 + new (&(m_dataSlotsRingBuffer[i].dataSlotID)) std::atomic<off_t>(static_cast<off_t>(block_type::block_empty));
  67 + m_dataSlotsRingBuffer[i].dataAddr = 0;
  68 + m_dataSlotsRingBuffer[i].dataSize = 0;
  69 + }
  70 +
  71 +}
  72 +
  73 +void MemoryManagerBase::attach(){
  74 + if (!m_shmMemoryManagementData.attach())
  75 + throw std::logic_error("Cannot attach to shared memory \""+m_shmMemoryManagementData.getName()+"\"");
  76 +
  77 + if (!m_shmDataRingBuffer.attach())
  78 + throw std::logic_error("Cannot attach to shared memory \""+m_shmDataRingBuffer.getName()+"\"");
  79 +
  80 + m_memoryManagementData=(memoryManagementData*)m_shmMemoryManagementData.getAddr();
  81 + m_dataSlotsRingBuffer=(dataSlot*)(m_memoryManagementData+1);
  82 +
  83 + m_dataRingBuffer=(uint8_t*)(m_shmDataRingBuffer.getAddr());
  84 +
  85 + m_dataSlotIDMask= 0xC000000000000000;
  86 + m_dataSlotAddrMask=0x3fffffffffffffff;
  87 + m_dataSlotIDShift=62;
  88 + while (m_memoryManagementData->dataSlotsRingBufferSize>(((uint64_t)1)<<(63-m_dataSlotIDShift))){
  89 + --m_dataSlotIDShift;
  90 + m_dataSlotIDMask=(m_dataSlotIDMask>>1)+0x8000000000000000;
  91 + m_dataSlotAddrMask=(m_dataSlotAddrMask>>1);
  92 + }
  93 + if (m_memoryManagementData->dataRingBufferSize>m_dataSlotAddrMask)
  94 + throw std::logic_error("Cannot manage addressing for current buffer size and block count");
  95 +}
  96 +
  97 +void MemoryManagerBase::destroy(){
  98 + for (size_t i = 0; i < m_memoryManagementData->dataSlotsRingBufferSize; ++i) {
  99 + m_dataSlotsRingBuffer[i].dataSlotID.~atomic();
  100 + }
  101 + m_memoryManagementData->freeSize.~atomic();
  102 +#ifdef FULLYPARALLEL
  103 + m_memoryManagementData->releaseData.~atomic();
  104 + m_memoryManagementData->allocData.~atomic();
  105 +#endif
  106 + m_shmMemoryManagementData.unlink();
  107 + m_shmDataRingBuffer.unlink();
  108 +}
  109 +
  110 +DataBlock MemoryManagerBase::alloc(const size_t& size, const uint64_t& timeoutns) {
  111 + if (!semWait(m_memoryManagementData->dataSlotsSemaphore, timeoutns))
  112 + return DataBlock(specialid_type::specialid_timeout);
  113 +
  114 + size_t freeSize=m_memoryManagementData->freeSize.load(std::memory_order_relaxed);
  115 + do {
  116 + if (freeSize<size){
  117 + semPost(m_memoryManagementData->dataSlotsSemaphore);
  118 + return DataBlock(specialid_type::specialid_outofmemory);
  119 + }
  120 + } while (!m_memoryManagementData->freeSize.compare_exchange_weak(freeSize, freeSize-size, std::memory_order_relaxed, std::memory_order_relaxed));
  121 +
  122 + uint64_t dataSlotID, newDataSlotID;
  123 + uint64_t addr, newAddr;
  124 +#ifdef FULLYPARALLEL
  125 + uint64_t allocData=m_memoryManagementData->allocData.load(std::memory_order_relaxed);
  126 + do {
  127 + newDataSlotID=(dataSlotID=(allocData>>m_dataSlotIDShift))+1;
  128 + if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
  129 + newDataSlotID=0;
  130 + newAddr=(addr=(allocData&m_dataSlotAddrMask))+size;
  131 + if (newAddr>=m_memoryManagementData->dataRingBufferSize)
  132 + newAddr-=m_memoryManagementData->dataRingBufferSize;
  133 + } while (!m_memoryManagementData->allocData.compare_exchange_weak(allocData, (newDataSlotID<<m_dataSlotIDShift) | newAddr, std::memory_order_relaxed, std::memory_order_relaxed));
  134 +#else
  135 + uint64_t allocData=m_memoryManagementData->allocData;
  136 + newDataSlotID=(dataSlotID=(allocData>>m_dataSlotIDShift))+1;
  137 + if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
  138 + newDataSlotID=0;
  139 + newAddr=(addr=(allocData&m_dataSlotAddrMask))+size;
  140 + if (newAddr>=m_memoryManagementData->dataRingBufferSize)
  141 + newAddr-=m_memoryManagementData->dataRingBufferSize;
  142 +
  143 + m_memoryManagementData->allocData=(newDataSlotID<<m_dataSlotIDShift) | newAddr;
  144 +#endif
  145 +
  146 + dataSlot& dataSlot(m_dataSlotsRingBuffer[dataSlotID]);
  147 +#ifndef NDEBUG
  148 + if (dataSlot.dataSlotID.load(std::memory_order_relaxed)!=static_cast<off_t>(block_type::block_empty))
  149 + throw std::logic_error("Error in alloc() algorithm");
  150 +#endif
  151 + dataSlot.dataAddr=addr;
  152 + dataSlot.dataSize=size;
  153 + dataSlot.dataSlotID.store(dataSlotID, std::memory_order_release);
  154 + return DataBlock(dataSlot, m_dataRingBuffer);
  155 +}
  156 +
  157 +void MemoryManagerBase::free(off_t dataSlotID) {
  158 + uint64_t requiredReleaseData;
  159 + uint64_t newDataSlotID;
  160 + uint64_t newAddr;
  161 +
  162 + dataSlot* currentDataSlot=&(m_dataSlotsRingBuffer[dataSlotID]);
  163 + currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_torelease), std::memory_order_relaxed);
  164 +#ifdef FULLYPARALLEL
  165 + do {
  166 +#endif
  167 + newDataSlotID=dataSlotID+1;
  168 + if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
  169 + newDataSlotID=0;
  170 + newAddr=currentDataSlot->dataAddr+currentDataSlot->dataSize;
  171 + requiredReleaseData=((dataSlotID<<m_dataSlotIDShift) | currentDataSlot->dataAddr);
  172 +#ifdef FULLYPARALLEL
  173 + if (newAddr>=m_memoryManagementData->dataRingBufferSize)
  174 + newAddr-=m_memoryManagementData->dataRingBufferSize;
  175 + if (!m_memoryManagementData->releaseData.compare_exchange_strong(requiredReleaseData, (newDataSlotID<<m_dataSlotIDShift)|newAddr|0x8000000000000000, std::memory_order_relaxed, std::memory_order_relaxed))
  176 + break;
  177 + currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_empty), std::memory_order_relaxed);
  178 + m_memoryManagementData->releaseData.fetch_and(~0x8000000000000000, std::memory_order_relaxed);
  179 +#else
  180 +#ifndef NDEBUG
  181 + if (m_memoryManagementData->releaseData!=requiredReleaseData)
  182 + throw std::logic_error("Error in free() algorithm");
  183 +#endif
  184 + m_memoryManagementData->releaseData=(newDataSlotID<<m_dataSlotIDShift)|newAddr;
  185 + currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_empty), std::memory_order_relaxed);
  186 +#endif
  187 + m_memoryManagementData->freeSize.fetch_add(currentDataSlot->dataSize, std::memory_order_relaxed);
  188 + semPost(m_memoryManagementData->dataSlotsSemaphore);
  189 + currentDataSlot=&(m_dataSlotsRingBuffer[dataSlotID=newDataSlotID]);
  190 +#ifdef FULLYPARALLEL
  191 + } while(currentDataSlot->dataSlotID.load(std::memory_order_relaxed)==static_cast<off_t>(block_type::block_torelease));
  192 +#endif
  193 +}
  194 +
... ...
src/MemoryManager.h 0 → 100644
  1 +++ a/src/MemoryManager.h
  1 +/*
  2 + * MemoryManager.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef MEMORYMANAGER_H_
  9 +#define MEMORYMANAGER_H_
  10 +
  11 +#include <semaphore.h>
  12 +#include <string>
  13 +#include <atomic>
  14 +#include "shm.h"
  15 +#include "DataBlock.h"
  16 +#include "enums.h"
  17 +/*
  18 +#include <limits.h>
  19 +#include <linux/limits.h>
  20 +#include <unistd.h>
  21 +#include <stdint.h>
  22 +
  23 +*/
  24 +#define FULLYPARALLEL
  25 +
  26 +#ifndef FULLYPARALLEL
  27 +#include <mutex>
  28 +#endif
  29 +
  30 +struct memoryManagementData {
  31 + size_t dataRingBufferSize;
  32 + size_t dataSlotsRingBufferSize;
  33 + sem_t dataSlotsSemaphore;
  34 +#ifndef FULLYPARALLEL
  35 + uint64_t allocData;
  36 + uint64_t releaseData;
  37 +#else
  38 + std::atomic<uint64_t> allocData;
  39 + std::atomic<uint64_t> releaseData;
  40 +#endif
  41 + std::atomic<size_t> freeSize;
  42 +};
  43 +
  44 +
  45 +class MemoryManagerBase {
  46 + const std::string& m_name;
  47 +
  48 + shmRing m_shmDataRingBuffer;
  49 + shm m_shmMemoryManagementData;
  50 +
  51 + uint8_t* m_dataRingBuffer;
  52 + dataSlot* m_dataSlotsRingBuffer;
  53 + memoryManagementData* m_memoryManagementData;
  54 + uint64_t m_dataSlotIDMask;
  55 + uint8_t m_dataSlotIDShift;
  56 + uint64_t m_dataSlotAddrMask;
  57 +protected:
  58 + MemoryManagerBase(const std::string& name);
  59 + virtual ~MemoryManagerBase();
  60 +
  61 + void attach();
  62 + void create(const size_t& dataRingBufferSize, const size_t& dataSlotsRingbufferSize);
  63 + void destroy();
  64 +
  65 + inline DataBlock getDataBlock(const off_t& dataSlotID, const off_t& slotID) { return DataBlock(m_dataSlotsRingBuffer[dataSlotID], m_dataRingBuffer, slotID);}
  66 +
  67 + DataBlock alloc(const size_t& size, const uint64_t& timeoutns=UINT64_MAX);
  68 + inline void free(const DataBlock& dataBlock) { free(dataBlock.blockID()); }
  69 + void free(off_t dataSlotID);
  70 +public:
  71 + inline const size_t& dataSlotsRingBufferSize() const {return m_memoryManagementData->dataSlotsRingBufferSize;}
  72 +};
  73 +
  74 +template <access_type A> class MemoryManager : public MemoryManagerBase {
  75 +public:
  76 + MemoryManager(const std::string& name): MemoryManagerBase(name) {}
  77 + virtual ~MemoryManager() {}
  78 + using MemoryManagerBase::attach;
  79 + using MemoryManagerBase::getDataBlock;
  80 +};
  81 +
  82 +template <> class MemoryManager<access_type::access_master> : public MemoryManagerBase {
  83 +public:
  84 + MemoryManager(const std::string& name) : MemoryManagerBase(name){}
  85 + virtual ~MemoryManager(){
  86 + destroy();
  87 + }
  88 +
  89 + using MemoryManagerBase::create;
  90 + using MemoryManagerBase::destroy;
  91 + using MemoryManagerBase::alloc;
  92 + using MemoryManagerBase::free;
  93 +};
  94 +
  95 +#endif /* MEMORYMANAGER_H_ */
... ...
src/ModuloCounter.h 0 → 100644
  1 +++ a/src/ModuloCounter.h
  1 +/*
  2 + * ModuloCounter.h
  3 + *
  4 + * Created on: 8 mar 2019
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef MODULOCOUNTER_H_
  9 +#define MODULOCOUNTER_H_
  10 +/*
  11 +#include <type_traits>
  12 +#include <limits>
  13 +#include <atomic>
  14 +#include <assert.h>
  15 +*/
  16 +template <typename T, class Enable = void> class ModuloCounter {};
  17 +template <typename T> class ModuloCounter<T, typename std::enable_if<std::numeric_limits<T>::is_integer>::type> {
  18 + T m_value;
  19 + T m_limit;
  20 +public:
  21 + ModuloCounter(const T& value=-1, const T& modulo=0):m_value(value), m_limit(modulo-1){
  22 + assert(m_limit>=-1);
  23 + assert(m_limit==-1 || (m_value>=0 && m_value<=m_limit));
  24 + }
  25 +
  26 + inline void operator()(const T& value, const T& modulo) {
  27 + m_value=value;
  28 + m_limit=modulo-1;
  29 + assert(m_limit>=0);
  30 + assert(m_value>=0 && m_value<=m_limit);
  31 + }
  32 +
  33 + T& operator=(const T& value){
  34 + assert(m_limit>=0);
  35 + assert(m_value>=0 && m_value<=m_limit);
  36 + m_value=value;
  37 + return m_value;
  38 + }
  39 +
  40 + inline T& operator++(){
  41 + assert(m_limit>=0);
  42 + if (m_value!=m_limit)
  43 + m_value++;
  44 + else
  45 + m_value=0;
  46 + return m_value;
  47 + }
  48 + inline T& operator--(){
  49 + assert(m_limit>=0);
  50 + if (m_value!=0)
  51 + m_value--;
  52 + else
  53 + m_value=m_limit;
  54 + return m_value;
  55 + }
  56 +
  57 + inline T operator++(int){
  58 + assert(m_limit>=0);
  59 + const T orgvalue(m_value);
  60 + ++(*this);
  61 + return orgvalue;
  62 + }
  63 + inline T operator--(int){
  64 + assert(m_limit>=0);
  65 + const T orgvalue(m_value);
  66 + --(*this);
  67 + return orgvalue;
  68 + }
  69 + inline const T& val() const{
  70 + return m_value;
  71 + }
  72 + inline T prev() const {
  73 + assert(m_limit>=0);
  74 + if (m_value!=0)
  75 + return m_value-1;
  76 + else
  77 + return m_limit;
  78 + }
  79 + inline static T prev(const T& value, const T& modulo) {
  80 + assert(modulo>0);
  81 + assert(value>=0 && value<modulo);
  82 + if (value!=0)
  83 + return value-1;
  84 + else
  85 + return modulo-1;
  86 + }
  87 + inline T next() const {
  88 + assert(m_limit>=0);
  89 + if (m_value!=m_limit)
  90 + return m_value+1;
  91 + else
  92 + return 0;
  93 + }
  94 + inline static T next(const T& value, const T& modulo) {
  95 + assert(modulo>0);
  96 + assert(value>=0 && value<modulo);
  97 + if (value!=modulo-1)
  98 + return value+1;
  99 + else
  100 + return 0;
  101 + }
  102 +};
  103 +
  104 +template <typename T, class Enable = void> class AtomicModuloCounter {};
  105 +template <typename T> class AtomicModuloCounter<T, typename std::enable_if<std::numeric_limits<T>::is_integer>::type> {
  106 + std::atomic<T> m_value;
  107 + T m_limit;
  108 + AtomicModuloCounter& operator=(const AtomicModuloCounter&) = delete;
  109 + AtomicModuloCounter& operator=(const AtomicModuloCounter&) volatile = delete;
  110 +public:
  111 + AtomicModuloCounter(const T& value=-1, const T& modulo=0):m_value(value), m_limit(modulo-1){
  112 + assert(m_limit>=-1);
  113 + assert(m_limit==-1 || (m_value>=0 && m_value<=m_limit));
  114 + }
  115 +
  116 + inline void operator()(const T& value, const T& modulo) {
  117 + m_value=value;
  118 + m_limit=modulo-1;
  119 + assert(m_limit>=0);
  120 + assert(m_value>=0 && m_value<=m_limit);
  121 + }
  122 +
  123 + T& operator=(const T& value){
  124 + assert(m_limit>=0);
  125 + assert(m_value>=0 && m_value<=m_limit);
  126 + return m_value=value;
  127 + }
  128 +
  129 + inline T operator++(){
  130 + assert(m_limit>=0);
  131 + T val=m_value.load(std::memory_order_relaxed);
  132 + T newval;
  133 + do {
  134 + newval=(val!=m_limit)?val+1:0;
  135 + } while (!m_value.compare_exchange_weak(val, newval, std::memory_order_release, std::memory_order_relaxed));
  136 + return newval;
  137 + }
  138 + inline T operator--(){
  139 + assert(m_limit>=0);
  140 + T val=m_value.load(std::memory_order_relaxed);
  141 + T newval;
  142 + do {
  143 + newval=(val!=0)?val-1:m_limit;
  144 + } while (!m_value.compare_exchange_weak(val, newval, std::memory_order_release, std::memory_order_relaxed));
  145 + return newval;
  146 + }
  147 +
  148 + inline T operator++(T){
  149 + assert(m_limit>=0);
  150 + T val=m_value.load(std::memory_order_relaxed);
  151 + do {
  152 + } while (!m_value.compare_exchange_weak(val, (val!=m_limit)?val+1:0, std::memory_order_release, std::memory_order_relaxed));
  153 + return val;
  154 + }
  155 + inline T operator--(T){
  156 + assert(m_limit>=0);
  157 + T val=m_value.load(std::memory_order_relaxed);
  158 + do {
  159 + } while (!m_value.compare_exchange_weak(val, (val!=0)?val-1:m_limit, std::memory_order_release, std::memory_order_relaxed));
  160 + return val;
  161 + }
  162 + inline const T val() const{
  163 + return m_value.load(std::memory_order_acquire);
  164 + }
  165 +};
  166 +
  167 +#endif /* MODULOCOUNTER_H_ */
... ...
src/Producer.cpp 0 → 100644
  1 +++ a/src/Producer.cpp
  1 +#include <signal.h>
  2 +#include <string.h>
  3 +#include <pthread.h>
  4 +
  5 +#include "Producer.h"
  6 +#include "ModuloCounter.h"
  7 +#include "ProducerOnInitLock.h"
  8 +
  9 +std::recursive_mutex ProducerOnInitLock::lock;
  10 +
  11 +
  12 +Producer::Producer(const std::string& name) :
  13 + SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
  14 + ProducerOnInitLock::lockOnInit();
  15 + sem_init(&m_bufferReleaseMonitorStarted, 0, 0);
  16 +}
  17 +
  18 +Producer::~Producer() {
  19 + ProducerOnInitLock::lockOnInit();
  20 + sem_destroy(&m_bufferReleaseMonitorStarted);
  21 + ProducerOnInitLock::unlockOnInit();
  22 +}
  23 +
  24 +void Producer::create(const size_t& bufferSize, const size_t& dataSlotsRingBufferSize) {
  25 + try {
  26 + m_memoryManagerData.create(bufferSize, dataSlotsRingBufferSize);
  27 + m_synchronizerData.create();
  28 + }
  29 + catch (...) {
  30 + ProducerOnInitLock::unlockOnInit();
  31 + throw;
  32 + }
  33 + ProducerOnInitLock::unlockOnInit();
  34 +}
  35 +
  36 +void Producer::init() {
  37 + m_synchronizerData.wacquire(m_synchronizerData.getWriteSlotID());
  38 + m_bufferReleaseThread=std::thread(&Producer::bufferReleaseTask, this);
  39 + semWait(m_bufferReleaseMonitorStarted);
  40 + ProducerOnInitLock::unlockOnInit();
  41 + m_synchronizerData.unlockSynchronizerData();
  42 +}
  43 +void Producer::done() {
  44 + DataBlock dataBlock(specialid_type::specialid_stop);
  45 + commit(dataBlock);
  46 + m_synchronizerData.wrelease(m_synchronizerData.getWriteSlotID());
  47 + if (m_bufferReleaseThread.joinable())
  48 + m_bufferReleaseThread.join();
  49 +}
  50 +
  51 +void Producer::allocateDataBlock(DataBlock& dataBlock, const size_t size, const uint64_t& timeoutns) {
  52 + dataBlock=m_memoryManagerData.alloc(size, timeoutns);
  53 + if (dataBlock)
  54 + dataBlock.attach(this);
  55 +}
  56 +void Producer::releaseDataBlock(const DataBlock& data){
  57 + m_memoryManagerData.free(data);
  58 +}
  59 +
  60 +bool Producer::commit(DataBlock& dataBlock, const uint64_t& timeoutns) {
  61 + const off_t writeSlotID=m_synchronizerData.getWriteSlotID();
  62 + const off_t writeSlotIDNext=ModuloCounter<off_t>::next(writeSlotID, m_synchronizerData.slotsRingBufferSize());
  63 +
  64 + if (writeSlotID<0)
  65 + return true;
  66 +
  67 + if (!m_synchronizerData.wwait(writeSlotID, timeoutns)) {
  68 + pthread_yield();
  69 + return false;
  70 + }
  71 + if (dataBlock.isStop()){
  72 + m_synchronizerData.setQueueSlot(writeSlotID, static_cast<off_t>(specialid_type::specialid_stop));
  73 + } else {
  74 + m_synchronizerData.setQueueSlot(writeSlotID, dataBlock.blockID());
  75 + }
  76 + m_synchronizerData.wacquire(writeSlotIDNext);
  77 + m_synchronizerData.lockSynchronizerData();
  78 + m_synchronizerData.wrelease(writeSlotID);
  79 + m_synchronizerData.setWriteSlotID(writeSlotIDNext);
  80 + m_synchronizerData.unlockSynchronizerData();
  81 + dataBlock.detach();
  82 + dataBlock.setEmpty();
  83 + return true;
  84 +}
  85 +
  86 +
  87 +void Producer::bufferReleaseTask() {
  88 + off_t releaseDataSlotID;
  89 + ModuloCounter<off_t> monitorSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());
  90 +
  91 + m_synchronizerData.facquire(monitorSlotID.val());
  92 + semPost(m_bufferReleaseMonitorStarted);
  93 +
  94 + while (1) {
  95 + m_synchronizerData.fwait(monitorSlotID.val());
  96 + if (m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop))
  97 + break;
  98 + if ((releaseDataSlotID = m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID)>=0) {
  99 + m_synchronizerData.setQueueSlot(monitorSlotID.val(), static_cast<off_t>(specialid_type::specialid_empty));
  100 + m_memoryManagerData.free(releaseDataSlotID);
  101 + }
  102 + m_synchronizerData.facquire(monitorSlotID.next());
  103 + m_synchronizerData.frelease(monitorSlotID++);
  104 + }
  105 + m_synchronizerData.frelease(monitorSlotID.val());
  106 +}
... ...
src/Producer.h 0 → 100644
  1 +++ a/src/Producer.h
  1 +/*
  2 + * Producer.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef PRODUCER_H_
  9 +#define PRODUCER_H_
  10 +
  11 +
  12 +#include <string>
  13 +#include <thread>
  14 +#include "SubsystemBase.h"
  15 +#include "MemoryManager.h"
  16 +#include "Synchronizer.h"
  17 +/*
  18 +#include <unistd.h>
  19 +
  20 +*/
  21 +class Producer : public SubsystemBase {
  22 + MemoryManager<access_type::access_master> m_memoryManagerData;
  23 + Synchronizer<access_type::access_master> m_synchronizerData;
  24 +
  25 + std::thread m_bufferReleaseThread;
  26 + sem_t m_bufferReleaseMonitorStarted;
  27 +
  28 +public:
  29 + Producer(const std::string& name);
  30 + virtual ~Producer();
  31 +
  32 + void create(const size_t& bufferSize, const size_t& dataSlotsRingBufferSize);
  33 + void init();
  34 + void done();
  35 + void allocateDataBlock(DataBlock& dataBlock, const size_t size, const uint64_t& timeoutns=UINT64_MAX);
  36 + void releaseDataBlock(const DataBlock& data);
  37 + bool commit(DataBlock& dataBlock, const uint64_t& timeoutns=UINT64_MAX);
  38 +private:
  39 + void bufferReleaseTask();
  40 +};
  41 +#endif /* PRODUCER_H_ */
... ...
src/ProducerOnInitLock.h 0 → 100644
  1 +++ a/src/ProducerOnInitLock.h
  1 +/*
  2 + * ProducerOnInitLock.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef PRODUCERONINITLOCK_H_
  9 +#define PRODUCERONINITLOCK_H_
  10 +
  11 +#include <mutex>
  12 +
  13 +class ProducerOnInitLock {
  14 +public:
  15 + static std::recursive_mutex lock;
  16 + ProducerOnInitLock(){
  17 + }
  18 + virtual ~ProducerOnInitLock(){}
  19 + static void lockOnInit(){
  20 + lock.lock();
  21 + }
  22 + static void unlockOnInit(){
  23 + lock.unlock();
  24 + }
  25 +};
  26 +
  27 +#endif /* PRODUCERONINITLOCK_H_ */
... ...
src/SubsystemBase.h 0 → 100644
  1 +++ a/src/SubsystemBase.h
  1 +/*
  2 + * subsystemBase.h
  3 + *
  4 + * Created on: 30 sty 2019
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef SUBSYSTEMBASE_H_
  9 +#define SUBSYSTEMBASE_H_
  10 +
  11 +#include <string>
  12 +#include "DataBlock.h"
  13 +
  14 +class SubsystemBase {
  15 +protected:
  16 + const std::string m_name;
  17 +public:
  18 + SubsystemBase(const std::string& name) : m_name(name) {}
  19 + virtual ~SubsystemBase(){}
  20 + virtual void releaseDataBlock(const DataBlock& data) = 0;
  21 +};
  22 +
  23 +#endif /* SUBSYSTEMBASE_H_ */
... ...
src/Synchronizer.cpp 0 → 100644
  1 +++ a/src/Synchronizer.cpp
  1 +/*
  2 + * Synchronizer.cpp
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#include <sys/mman.h>
  9 +#include "Synchronizer.h"
  10 +#include "MemoryManager.h"
  11 +
  12 +std::map<std::string, SynchronizerBase*> SynchronizerBase::synchronizersMap;
  13 +std::mutex SynchronizerBase::synchronizersMapLock;
  14 +
  15 +SynchronizerBase::SynchronizerBase(const std::string& name) : m_name(name), m_slotsRingBufferSize(0), m_syncSlotsRingBuffer(NULL), m_synchronizerData(NULL) {
  16 +}
  17 +
  18 +SynchronizerBase::~SynchronizerBase() {
  19 +}
  20 +
  21 +void SynchronizerBase::create(){
  22 + synchronizersMapLock.lock();
  23 + try {
  24 + if (synchronizersMap.find(m_name)!=synchronizersMap.end()){
  25 + synchronizersMapLock.unlock();
  26 + throw std::logic_error("Process data memory\""+m_name+"\" already exists");
  27 + }
  28 +
  29 + MemoryManager<access_type::access_slave> memoryManagerData(m_name);
  30 + memoryManagerData.attach();
  31 +
  32 + m_slotsRingBufferSize = memoryManagerData.dataSlotsRingBufferSize();
  33 +
  34 + m_synchronizerData=new synchronizerData;
  35 + m_synchronizerData->writeSlotID=0;
  36 + sem_init(&(m_synchronizerData->lock),0,0);
  37 + m_synchronizerData->consumersLock=PTHREAD_RWLOCK_INITIALIZER;
  38 + mlock(m_synchronizerData,sizeof(*m_synchronizerData));
  39 +
  40 + m_syncSlotsRingBuffer=new syncSlot[m_slotsRingBufferSize];
  41 + mlock(m_syncSlotsRingBuffer,sizeof(m_slotsRingBufferSize*sizeof(*m_syncSlotsRingBuffer)));
  42 + for (size_t i = 0; i < m_slotsRingBufferSize; ++i) {
  43 + if (wrflock_init(&(m_syncSlotsRingBuffer[i].lock), WRFLOCK_FWAITYIELD, 0)!=0)
  44 + throw std::logic_error("WRF lock init error");
  45 + m_syncSlotsRingBuffer[i].dataSlotID=static_cast<off_t>(specialid_type::specialid_empty);
  46 + }
  47 + synchronizersMap[m_name]=this;
  48 + }
  49 + catch (...) {
  50 + synchronizersMapLock.unlock();
  51 + throw;
  52 + }
  53 + synchronizersMapLock.unlock();
  54 +}
  55 +
  56 +void SynchronizerBase::attach(){
  57 + synchronizersMapLock.lock();
  58 + try {
  59 + if (synchronizersMap.find(m_name)==synchronizersMap.end()){
  60 + synchronizersMapLock.unlock();
  61 + throw std::logic_error("Synchronizer data \""+m_name+"\" does not exists");
  62 + }
  63 +
  64 + MemoryManager<access_type::access_slave> ringbufferData(m_name);
  65 + ringbufferData.attach();
  66 +
  67 + m_slotsRingBufferSize = ringbufferData.dataSlotsRingBufferSize();
  68 +
  69 + m_synchronizerData=synchronizersMap[m_name]->m_synchronizerData;
  70 + m_syncSlotsRingBuffer=synchronizersMap[m_name]->m_syncSlotsRingBuffer;
  71 + }
  72 + catch (...) {
  73 + synchronizersMapLock.unlock();
  74 + throw;
  75 + }
  76 + synchronizersMapLock.unlock();
  77 +}
  78 +
  79 +void SynchronizerBase::destroy(){
  80 + synchronizersMapLock.lock();
  81 + if (m_syncSlotsRingBuffer){
  82 + for (size_t i=0;i<m_slotsRingBufferSize;i++){
  83 + wrflock_destroy(&(m_syncSlotsRingBuffer[i].lock));
  84 + }
  85 + munlock(m_syncSlotsRingBuffer,m_slotsRingBufferSize*sizeof(*m_syncSlotsRingBuffer));
  86 + delete [] m_syncSlotsRingBuffer;
  87 + }
  88 + if (m_synchronizerData){
  89 + sem_destroy(&(m_synchronizerData->lock));
  90 + munlock(m_synchronizerData,sizeof(*m_synchronizerData));
  91 + delete m_synchronizerData;
  92 + }
  93 + assert (synchronizersMap.find(m_name)!=synchronizersMap.end());
  94 + synchronizersMap.erase(m_name);
  95 + synchronizersMapLock.unlock();
  96 +}
... ...
src/Synchronizer.h 0 → 100644
  1 +++ a/src/Synchronizer.h
  1 +/*
  2 + * Synchronizer.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef SYNCHRONIZER_H_
  9 +#define SYNCHRONIZER_H_
  10 +
  11 +#include <errno.h>
  12 +#include <assert.h>
  13 +#include <pthread.h>
  14 +#include <semaphore.h>
  15 +#include <map>
  16 +#include <mutex>
  17 +#include "wrflock.h"
  18 +#include "pthread_helpers.h"
  19 +#include "enums.h"
  20 +/*
  21 +#include <linux/limits.h>
  22 +#include <sys/mman.h>
  23 +#include <time.h>
  24 +#include <unistd.h>
  25 +#include <string>
  26 +#include <atomic>
  27 +#include <stdint.h>
  28 +#include <memory>
  29 +
  30 +#include <iostream>
  31 +
  32 +*/
  33 +#ifndef NS_IN_S
  34 +#define NS_IN_S 1000000000
  35 +#endif
  36 +
  37 +struct syncSlot {
  38 + wrflock_t lock;
  39 + off_t dataSlotID;
  40 +};
  41 +
  42 +struct synchronizerData {
  43 + sem_t lock; //lock for data blocks access
  44 + pthread_rwlock_t consumersLock; //each consumer do rdlock to prevent producer finish prematurely
  45 + std::atomic<off_t> writeSlotID;
  46 +};
  47 +
  48 +class SynchronizerBase {
  49 + static std::map<std::string, SynchronizerBase*> synchronizersMap;
  50 + static std::mutex synchronizersMapLock;
  51 +
  52 + const std::string m_name;
  53 + size_t m_slotsRingBufferSize;
  54 + syncSlot* m_syncSlotsRingBuffer;
  55 + synchronizerData* m_synchronizerData;
  56 +protected:
  57 + SynchronizerBase(const std::string& name);
  58 + virtual ~SynchronizerBase();
  59 +
  60 + void create();
  61 + void attach();
  62 + void destroy();
  63 +
  64 + inline void setWriteSlotID(const off_t& slotID) {
  65 + m_synchronizerData->writeSlotID.store(slotID, std::memory_order_release);
  66 + }
  67 +
  68 + inline const off_t getWriteSlotID() const {
  69 + assert(m_synchronizerData->writeSlotID>=0);
  70 + return m_synchronizerData->writeSlotID.load(std::memory_order_acquire);
  71 + }
  72 +
  73 + bool consumersActiveLock(){
  74 +
  75 + return rdLock(m_synchronizerData->consumersLock,0)==0;
  76 + }
  77 +
  78 + bool consumersActiveUnlock(){
  79 + return rwUnlock(m_synchronizerData->consumersLock)==0;
  80 + }
  81 +
  82 + bool consumersInactiveLock(const uint64_t& timeoutns=UINT64_MAX){
  83 + return wrLock(m_synchronizerData->consumersLock, timeoutns)==0;
  84 + }
  85 +
  86 + bool consumersInactiveUnlock(){
  87 + return rwUnlock(m_synchronizerData->consumersLock)==0;
  88 + }
  89 +public:
  90 + inline const size_t& slotsRingBufferSize() const {
  91 + return m_slotsRingBufferSize;
  92 + }
  93 +
  94 + inline void setQueueSlot(const off_t& slotID, const off_t& blockID) noexcept {
  95 + m_syncSlotsRingBuffer[slotID].dataSlotID=blockID;
  96 + }
  97 +
  98 + inline syncSlot& getQueueSlot(const off_t& slotID) const noexcept {
  99 + return m_syncSlotsRingBuffer[slotID];
  100 + }
  101 +
  102 + inline synchronizerData& getSynchronizerData() const noexcept {
  103 + return *m_synchronizerData;
  104 + }
  105 +
  106 + inline void lockSynchronizerData() noexcept {
  107 + semWait(m_synchronizerData->lock);
  108 + }
  109 +
  110 + inline void unlockSynchronizerData() noexcept {
  111 + semPost(m_synchronizerData->lock);
  112 + }
  113 +
  114 + inline bool wacquire(const off_t& slotID) noexcept {
  115 + return wrflock_wacquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  116 + }
  117 +
  118 + inline bool racquire(const off_t& slotID) noexcept {
  119 + return wrflock_racquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  120 + }
  121 +
  122 + inline bool facquire(const off_t& slotID) noexcept {
  123 + return wrflock_facquire(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  124 + }
  125 +
  126 + inline bool wrelease(const off_t& slotID) noexcept {
  127 + return wrflock_wrelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  128 + }
  129 +
  130 + inline bool rrelease(const off_t& slotID) noexcept {
  131 + return wrflock_rrelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  132 + }
  133 +
  134 + inline bool frelease(const off_t& slotID) noexcept {
  135 + return wrflock_frelease(&(m_syncSlotsRingBuffer[slotID].lock))>=0;
  136 + }
  137 +
  138 + inline bool wwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
  139 + if (timeoutns==UINT64_MAX){
  140 + while (1) {
  141 + if (wrflock_wwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  142 + if (errno==EINTR)
  143 + continue;
  144 + return false;
  145 + }
  146 + return true;
  147 + };
  148 + } else if (timeoutns==0) {
  149 + while(1){
  150 + if (wrflock_wtrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  151 + return false;
  152 + }
  153 + return true;
  154 + }
  155 + } else {
  156 + timespec time;
  157 + clock_gettime(CLOCK_REALTIME, &time);
  158 + time.tv_nsec+=timeoutns;
  159 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  160 + time.tv_nsec%=NS_IN_S;
  161 + while (1) {
  162 + if (wrflock_wtimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
  163 + if (errno==EINTR)
  164 + continue;
  165 + return false;
  166 + }
  167 + return true;
  168 + }
  169 + }
  170 + }
  171 +
  172 + inline bool rwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
  173 + if (timeoutns==UINT64_MAX){
  174 + while (1) {
  175 + if (wrflock_rwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  176 + if (errno==EINTR)
  177 + continue;
  178 + return false;
  179 + }
  180 + return true;
  181 + }
  182 + } else if (timeoutns==0) {
  183 + while(1){
  184 + if (wrflock_rtrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  185 + return false;
  186 + }
  187 + return true;
  188 + }
  189 + } else {
  190 + timespec time;
  191 + clock_gettime(CLOCK_REALTIME, &time);
  192 + time.tv_nsec+=timeoutns;
  193 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  194 + time.tv_nsec%=NS_IN_S;
  195 + while (1) {
  196 + if (wrflock_rtimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
  197 + if (errno==EINTR)
  198 + continue;
  199 + return false;
  200 + }
  201 + return true;
  202 + }
  203 + }
  204 + }
  205 + inline bool fwait(const off_t& slotID, const uint64_t& timeoutns=UINT64_MAX) noexcept {
  206 + if (timeoutns==UINT64_MAX){
  207 + while (1) {
  208 + if (wrflock_fwait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  209 + if (errno==EINTR)
  210 + continue;
  211 + return false;
  212 + }
  213 + return true;
  214 + }
  215 + } else if (timeoutns==0) {
  216 + while(1){
  217 + if (wrflock_ftrywait(&(m_syncSlotsRingBuffer[slotID].lock))<0){
  218 + return false;
  219 + }
  220 + return true;
  221 + }
  222 + } else {
  223 + timespec time;
  224 + clock_gettime(CLOCK_REALTIME, &time);
  225 + time.tv_nsec+=timeoutns;
  226 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  227 + time.tv_nsec%=NS_IN_S;
  228 + while (1) {
  229 + if (wrflock_ftimewait(&(m_syncSlotsRingBuffer[slotID].lock),&time)<0){
  230 + if (errno==EINTR)
  231 + continue;
  232 + return false;
  233 + }
  234 + return true;
  235 + }
  236 + }
  237 + }
  238 +
  239 +};
  240 +
  241 +template <access_type R> class Synchronizer {};
  242 +
  243 +template <> class Synchronizer<access_type::access_master>: public SynchronizerBase {
  244 +public:
  245 + Synchronizer(const std::string& name): SynchronizerBase(name){}
  246 + virtual ~Synchronizer() {
  247 + destroy();
  248 + }
  249 + using SynchronizerBase::create;
  250 + using SynchronizerBase::destroy;
  251 + using SynchronizerBase::setWriteSlotID;
  252 + using SynchronizerBase::getWriteSlotID;
  253 + using SynchronizerBase::consumersInactiveLock;
  254 + using SynchronizerBase::consumersInactiveUnlock;
  255 + using SynchronizerBase::getSynchronizerData;
  256 +};
  257 +
  258 +template <> class Synchronizer<access_type::access_slave>: public SynchronizerBase {
  259 +public:
  260 + Synchronizer(const std::string& name): SynchronizerBase(name){}
  261 + virtual ~Synchronizer() {}
  262 +
  263 + using SynchronizerBase::attach;
  264 + using SynchronizerBase::getWriteSlotID;
  265 + using SynchronizerBase::consumersActiveLock;
  266 + using SynchronizerBase::consumersActiveUnlock;
  267 +};
  268 +
  269 +
  270 +#endif /* SYNCHRONIZER_H_ */
... ...
src/enums.h 0 → 100644
  1 +++ a/src/enums.h
  1 +/*
  2 + enums.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef ENUMS_H_
  9 +#define ENUMS_H_
  10 +
  11 +#include <unistd.h>
  12 +
  13 +enum class access_type {
  14 + access_master,
  15 + access_slave
  16 +};
  17 +
  18 +
  19 +enum class specialid_type : off_t {
  20 + specialid_outofmemory = -5,
  21 + specialid_timeout = -4,
  22 + specialid_stop = -3,
  23 + specialid_empty = -2,
  24 + specialid_none = -1
  25 +};
  26 +
  27 +#endif /* ENUMS_H_ */
... ...
src/futex.h 0 → 100644
  1 +++ a/src/futex.h
  1 +#ifndef FUTEXX_H
  2 +#define FUTEXX_H
  3 +
  4 +#include <linux/futex.h>
  5 +#include <sys/syscall.h>
  6 +#include <unistd.h>
  7 +#include <stdio.h>
  8 +#include <time.h>
  9 +
  10 +static __always_inline int
  11 +futex(const int *uaddr, const int futex_op, const int val,
  12 + const struct timespec *timeout, const int *uaddr2, const unsigned int val3) {
  13 + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
  14 +}
  15 +
  16 +static __always_inline __attribute__ ((__noreturn__)) void
  17 +futex_fatal_error (const int err) {
  18 + fprintf(stderr, "The futex facility returned an unexpected error code. (%d, %d)\n", err, errno);
  19 + _exit (127);
  20 +}
  21 +
  22 +static __always_inline int
  23 +futex_wait_bitset (const int *futex_word,
  24 + const int expected,
  25 + const struct timespec *abstime, const unsigned int bitset, const int priv) {
  26 + if (abstime != NULL && abstime->tv_sec < 0)
  27 + return ETIMEDOUT;
  28 + const int op = (FUTEX_WAIT_BITSET | FUTEX_CLOCK_REALTIME | (priv ? FUTEX_PRIVATE_FLAG : 0) );
  29 + const int err=futex(futex_word, op, expected, abstime, NULL, bitset);
  30 +
  31 + if (err>=0)
  32 + return err;
  33 +
  34 + switch (errno) {
  35 + case 0:
  36 + case EAGAIN:
  37 + case EINTR:
  38 + case ETIMEDOUT:
  39 + return err;
  40 + case EFAULT: /* Must have been caused by a glibc or application bug. */
  41 + case EINVAL: /* Either due to wrong alignment or due to the timeout not
  42 + being normalized. Must have been caused by a glibc or
  43 + application bug. */
  44 + case ENOSYS: /* Must have been caused by a glibc bug. */
  45 + default:
  46 + futex_fatal_error (err);
  47 + }
  48 +}
  49 +
  50 +static __always_inline int
  51 +futex_wake_bitset (const int *futex_word, const int processes_to_wake, const unsigned int bitset, const int priv) {
  52 + const int op = (FUTEX_WAKE_BITSET | (priv ? FUTEX_PRIVATE_FLAG : 0) );
  53 + const int err=futex(futex_word, op, processes_to_wake, NULL, NULL, bitset);
  54 +
  55 + if (err >= 0)
  56 + return err;
  57 + switch (errno) {
  58 + case EFAULT: /* Could have happened due to memory reuse. */
  59 + case EINVAL: /* Could be either due to incorrect alignment (a bug in
  60 + glibc or in the application) or due to memory being
  61 + reused for a PI futex. We cannot distinguish between the
  62 + two causes, and one of them is correct use, so we do not
  63 + act in this case. */
  64 + return err;
  65 + case ENOSYS: /* Must have been caused by a glibc bug. */
  66 + /* No other errors are documented at this time. */
  67 + default:
  68 + futex_fatal_error (err);
  69 + }
  70 +}
  71 +
  72 +static __always_inline int
  73 +futex_wait (const int *futex_word,
  74 + const int expected,
  75 + const struct timespec *abstime, const int priv) {
  76 + if (abstime != NULL && abstime->tv_sec < 0)
  77 + return ETIMEDOUT;
  78 + const int op = (FUTEX_WAIT | (priv ? FUTEX_PRIVATE_FLAG : 0) );
  79 + const int err=futex(futex_word, op, expected, abstime, NULL, 0);
  80 +
  81 + if (err>=0)
  82 + return err;
  83 +
  84 + switch (errno) {
  85 + case 0:
  86 + case EAGAIN:
  87 + case EINTR:
  88 + case ETIMEDOUT:
  89 + return err;
  90 + case EFAULT: /* Must have been caused by a glibc or application bug. */
  91 + case EINVAL: /* Either due to wrong alignment or due to the timeout not
  92 + being normalized. Must have been caused by a glibc or
  93 + application bug. */
  94 + case ENOSYS: /* Must have been caused by a glibc bug. */
  95 + default:
  96 + futex_fatal_error (err);
  97 + }
  98 +}
  99 +
  100 +static __always_inline int
  101 +futex_wake (const int *futex_word, const int processes_to_wake, const int priv) {
  102 + const int op = (FUTEX_WAKE | (priv ? FUTEX_PRIVATE_FLAG : 0) );
  103 + const int err=futex(futex_word, op, processes_to_wake, NULL, NULL, 0);
  104 +
  105 + if (err >= 0)
  106 + return err;
  107 + switch (errno) {
  108 + case EFAULT: /* Could have happened due to memory reuse. */
  109 + case EINVAL: /* Could be either due to incorrect alignment (a bug in
  110 + glibc or in the application) or due to memory being
  111 + reused for a PI futex. We cannot distinguish between the
  112 + two causes, and one of them is correct use, so we do not
  113 + act in this case. */
  114 + return err;
  115 + case ENOSYS: /* Must have been caused by a glibc bug. */
  116 + /* No other errors are documented at this time. */
  117 + default:
  118 + futex_fatal_error (err);
  119 + }
  120 +}
  121 +
  122 +#endif /*FUTEXX_H*/
0 123 \ No newline at end of file
... ...
src/pthread_helpers.h 0 → 100644
  1 +++ a/src/pthread_helpers.h
  1 +/*
  2 + * pthread_helpers.h
  3 + *
  4 + * Created on: 11 sty 2019
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef PTHREAD_HELPERS_H_
  9 +#define PTHREAD_HELPERS_H_
  10 +/*
  11 +#include <pthread.h>
  12 +#include <semaphore.h>
  13 +#include <errno.h>
  14 +#include <math.h>
  15 +//#include <string.h>
  16 +//#include <stdio.h>
  17 +#include <stdint.h>
  18 +#include <time.h>
  19 +*/
  20 +#ifndef NS_IN_S
  21 +#define NS_IN_S 1000000000
  22 +#endif
  23 +
  24 +inline bool semPost(sem_t& sem){
  25 + return sem_post(&sem)==0;
  26 +}
  27 +
  28 +inline bool semWait(sem_t& sem, const uint64_t& timeoutns=UINT64_MAX){
  29 + if (timeoutns==0) {
  30 + while(1){
  31 + if (sem_trywait(&sem)!=0){
  32 + if (errno==EINTR)
  33 + continue;
  34 + else if (errno==EAGAIN)
  35 + errno=ETIMEDOUT;
  36 + return false;
  37 + }
  38 + return true;
  39 + }
  40 + } else if (timeoutns==UINT64_MAX){
  41 + while (1) {
  42 + if (sem_wait(&sem)!=0){
  43 + if (errno==EINTR)
  44 + continue;
  45 + return false;
  46 + }
  47 + return true;
  48 + };
  49 + } else {
  50 + timespec time;
  51 + clock_gettime(CLOCK_REALTIME, &time);
  52 + time.tv_nsec+=timeoutns;
  53 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  54 + time.tv_nsec%=NS_IN_S;
  55 + while (1) {
  56 + if (sem_timedwait(&(sem),&time)!=0){
  57 + if (errno==EINTR)
  58 + continue;
  59 + return false;
  60 + }
  61 + return true;
  62 + }
  63 + }
  64 +}
  65 +
  66 +inline int wrLock(pthread_rwlock_t& lock, const uint64_t& timeoutns=UINT64_MAX){
  67 + if (timeoutns==0) {
  68 + return pthread_rwlock_trywrlock(&lock);
  69 + } else if (timeoutns==UINT64_MAX){
  70 + while (1) {
  71 + if (pthread_rwlock_wrlock(&lock)==0)
  72 + return 0;
  73 + };
  74 + } else {
  75 + int err;
  76 + timespec time;
  77 + clock_gettime(CLOCK_REALTIME, &time);
  78 + time.tv_nsec+=timeoutns;
  79 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  80 + time.tv_nsec%=NS_IN_S;
  81 + while (1) {
  82 + if ((err=pthread_rwlock_timedwrlock(&lock,&time))==0)
  83 + return 0;
  84 + else if (err==ETIMEDOUT)
  85 + return err;
  86 + }
  87 + }
  88 +}
  89 +
  90 +
  91 +inline int rdLock(pthread_rwlock_t& lock, const uint64_t& timeoutns=UINT64_MAX){
  92 + if (timeoutns==0) {
  93 + return pthread_rwlock_tryrdlock(&lock);
  94 + } else if (timeoutns==UINT64_MAX){
  95 + while (1) {
  96 + if (pthread_rwlock_rdlock(&lock)==0)
  97 + return 0;
  98 + };
  99 + } else {
  100 + int err;
  101 + timespec time;
  102 + clock_gettime(CLOCK_REALTIME, &time);
  103 + time.tv_nsec+=timeoutns;
  104 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  105 + time.tv_nsec%=NS_IN_S;
  106 + while (1) {
  107 + if ((err=pthread_rwlock_timedrdlock(&lock,&time))==0)
  108 + return 0;
  109 + else if (err==ETIMEDOUT)
  110 + return err;
  111 + }
  112 + }
  113 +}
  114 +inline int rwUnlock(pthread_rwlock_t& lock){
  115 + return pthread_rwlock_unlock(&lock);
  116 +}
  117 +
  118 +inline int mutexLock(pthread_mutex_t& lock, const uint64_t& timeoutns=UINT64_MAX){
  119 + if (timeoutns==0) {
  120 + int err;
  121 + if ((err=pthread_mutex_trylock(&lock))==EOWNERDEAD)
  122 + pthread_mutex_consistent(&lock);
  123 + return err;
  124 + } else if (timeoutns==UINT64_MAX){
  125 + int err;
  126 + while (1) {
  127 + if ((err=pthread_mutex_lock(&lock))==0)
  128 + return 0;
  129 + if (err==EOWNERDEAD){
  130 + pthread_mutex_consistent(&lock);
  131 + return err;
  132 + }
  133 + };
  134 + } else {
  135 + int err;
  136 + timespec time;
  137 + clock_gettime(CLOCK_REALTIME, &time);
  138 + time.tv_nsec+=timeoutns;
  139 + time.tv_sec+=(time.tv_nsec/NS_IN_S);
  140 + time.tv_nsec%=NS_IN_S;
  141 + while (1) {
  142 + if ((err=pthread_mutex_timedlock(&lock,&time))==0)
  143 + return 0;
  144 + else if (err==ETIMEDOUT)
  145 + return err;
  146 + else if (err==EOWNERDEAD){
  147 + pthread_mutex_consistent(&lock);
  148 + return err;
  149 + }
  150 + }
  151 + }
  152 +}
  153 +inline int mutexUnlock(pthread_mutex_t& lock){
  154 + return pthread_mutex_unlock(&lock);
  155 +}
  156 +
  157 +#endif /* PTHREAD_HELPERS_H_ */
... ...
src/shm.cpp 0 → 100644
  1 +++ a/src/shm.cpp
  1 +/*
  2 + * shm.cpp
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#include <sys/mman.h>
  9 +#include <sys/stat.h>
  10 +#include <limits.h>
  11 +#include <unistd.h>
  12 +#include <errno.h>
  13 +#include <string.h>
  14 +#include "shm.h"
  15 +/*
  16 +#include <memory.h>
  17 +#include <stdlib.h>
  18 +*/
  19 +std::map<std::string, shm::shmInUse> shm::inProcessMapping;
  20 +std::recursive_mutex shm::inProcessMappingLock;
  21 +
  22 +shmRing::shmRing(const std::string& name, int flags, mode_t mode):shm(name, flags, mode), m_placeholder(NULL), m_placeholderSize(0), m_data2(NULL) {
  23 +}
  24 +
  25 +bool shmRing::alloc(const size_t& size) {
  26 + int fd=-1;
  27 + int prot=PROT_READ;
  28 + const size_t pageSize=(size_t)sysconf(_SC_PAGESIZE);
  29 +
  30 + inProcessMappingLock.lock();
  31 + if (inProcessMapping.find(m_name)!=inProcessMapping.end()){
  32 + perror("ERROR: SHM object already created");
  33 + inProcessMappingLock.unlock();
  34 + return false;
  35 + }
  36 +
  37 + if (size%pageSize != 0)
  38 + m_size=((size)/pageSize+1)*pageSize;
  39 + m_placeholderSize=2*m_size+2*pageSize;
  40 + shm_unlink(m_name.c_str());
  41 + while (1) {
  42 + fd=shm_open(("/"+m_name).c_str(), m_flags | O_CREAT | O_EXCL, m_mode);
  43 + if (fd == -1){
  44 + perror(strerror(errno));
  45 + break;
  46 + }
  47 + if (ftruncate(fd, m_size) == -1){
  48 + perror(strerror(errno));
  49 + break;
  50 + }
  51 + if (m_flags|O_RDWR)
  52 + prot|=PROT_WRITE;
  53 +
  54 + m_placeholder=mmap(NULL,m_placeholderSize, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
  55 + if (m_placeholder == MAP_FAILED){
  56 + perror(strerror(errno));
  57 + break;
  58 + }
  59 + m_data=mmap((uint8_t*)m_placeholder+pageSize, m_size, prot, MAP_SHARED|MAP_FIXED, fd, 0);
  60 + if (m_data == MAP_FAILED){
  61 + perror(strerror(errno));
  62 + break;
  63 + }
  64 + m_data2=mmap((uint8_t*)m_data+m_size,m_size, prot, MAP_SHARED|MAP_FIXED, fd, 0);
  65 + if (m_data2 == MAP_FAILED){
  66 + perror(strerror(errno));
  67 + break;
  68 + }
  69 + close(fd);
  70 + memset(m_data2,0,m_size);
  71 + inProcessMapping[m_name].obj=this;
  72 + ++inProcessMapping[m_name].count;
  73 + inProcessMapping[m_name].unlink=true;
  74 + inProcessMappingLock.unlock();
  75 + return true;
  76 + }
  77 + if (fd !=-1) {
  78 + close(fd);
  79 + fd=-1;
  80 + shm_unlink(("/"+m_name).c_str());
  81 + }
  82 + if (m_data2)
  83 + munmap(m_data2,m_size);
  84 + if (m_data)
  85 + munmap(m_data,m_size);
  86 + if (m_placeholder)
  87 + munmap(m_placeholder,m_placeholderSize);
  88 + m_size=0;
  89 + m_data=NULL;
  90 + m_data2=NULL;
  91 + m_placeholder=NULL;
  92 + inProcessMappingLock.unlock();
  93 + return false;
  94 +}
  95 +
  96 +bool shmRing::attach() {
  97 + if (m_size!=0 || m_data!=NULL) {
  98 + perror("ERROR: SHM object already initialized");
  99 + return false;
  100 + }
  101 + inProcessMappingLock.lock();
  102 + if (inProcessMapping.find(m_name)!=inProcessMapping.end()) {
  103 + m_size = ((shmRing*)(inProcessMapping[m_name].obj))->m_size;
  104 + m_data = ((shmRing*)(inProcessMapping[m_name].obj))->m_data;
  105 + m_data2 = ((shmRing*)(inProcessMapping[m_name].obj))->m_data2;
  106 + ++inProcessMapping[m_name].count;
  107 + inProcessMappingLock.unlock();
  108 + return true;
  109 + }
  110 +
  111 + int fd = -1;
  112 + int prot = PROT_READ;
  113 + const size_t pageSize = (size_t)sysconf(_SC_PAGESIZE);
  114 + struct stat buf;
  115 +
  116 + while (1) {
  117 + fd = shm_open(("/"+m_name).c_str(), m_flags, 0);
  118 + if (fd==-1) {
  119 + perror(strerror(errno));
  120 + break;
  121 + }
  122 + if (fstat(fd, &buf)==-1) {
  123 + perror(strerror(errno));
  124 + break;
  125 + }
  126 + if (m_flags|O_RDWR)
  127 + prot |= PROT_WRITE;
  128 +
  129 + m_size = buf.st_size;
  130 + m_placeholderSize=2*m_size+2*pageSize;
  131 +
  132 + m_placeholder = mmap(NULL, m_placeholderSize, PROT_NONE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0);
  133 + if (m_placeholder==MAP_FAILED) {
  134 + perror(strerror(errno));
  135 + break;
  136 + }
  137 +
  138 + m_data = mmap((uint8_t*)m_placeholder+pageSize, m_size, prot, MAP_SHARED|MAP_FIXED, fd, 0);
  139 + if (m_data==MAP_FAILED) {
  140 + perror(strerror(errno));
  141 + break;
  142 + }
  143 + m_data2 = mmap((uint8_t*)m_data+m_size, m_size, prot, MAP_SHARED|MAP_FIXED, fd, 0);
  144 + if (m_data2==MAP_FAILED) {
  145 + perror(strerror(errno));
  146 + break;
  147 + }
  148 + close(fd);
  149 + fd = -1;
  150 + inProcessMapping[m_name].obj = this;
  151 + ++inProcessMapping[m_name].count;
  152 + inProcessMapping[m_name].unlink = false;
  153 + inProcessMappingLock.unlock();
  154 + return true;
  155 + }
  156 + if (m_data2)
  157 + munmap(m_data2, m_size);
  158 + if (m_data)
  159 + munmap(m_data, m_size);
  160 + if (m_placeholder)
  161 + munmap(m_placeholder, m_placeholderSize);
  162 + m_size = 0;
  163 + m_data = NULL;
  164 + m_data2 = NULL;
  165 + m_placeholder = NULL;
  166 + inProcessMappingLock.unlock();
  167 + return false;
  168 +}
  169 +
  170 +bool shmRing::mlock() {
  171 + return (::mlock(m_data, m_size*2) != -1);
  172 +}
  173 +
  174 +void shmRing::munlock() {
  175 + if (::munlock(m_data, m_size*2) == -1)
  176 + throw; //TODO
  177 +}
  178 +
  179 +void shmRing::free() {
  180 + inProcessMappingLock.lock();
  181 +
  182 + if (m_data)
  183 + munmap(m_data, m_size);
  184 + if (m_data)
  185 + munmap(m_data,m_size);
  186 + if (m_placeholder)
  187 + munmap(m_placeholder,m_placeholderSize);
  188 + m_size=0;
  189 + m_data=NULL;
  190 + m_data2=NULL;
  191 + m_placeholder=NULL;
  192 + inProcessMappingLock.unlock();
  193 +}
  194 +
  195 +shm::shm(const std::string& name, int flags, mode_t mode):m_name(name), m_flags(flags), m_mode(mode), m_data(NULL), m_size(0) {
  196 + if (m_name.length()>NAME_MAX-16)
  197 + throw std::logic_error("SHM name too long");
  198 +}
  199 +
  200 +void shm::free() {
  201 + inProcessMappingLock.lock();
  202 +
  203 + if ((--inProcessMapping[m_name].count)==0) {
  204 + if (m_data)
  205 + munmap(m_data, m_size);
  206 + if (m_name!="" && inProcessMapping[m_name].unlink)
  207 + shm_unlink(m_name.c_str());
  208 + inProcessMapping.erase(m_name);
  209 + }
  210 + inProcessMappingLock.unlock();
  211 +}
  212 +
  213 +void shm::unlink() {
  214 + shm_unlink(m_name.c_str());
  215 +}
  216 +
  217 +bool shm::alloc(const size_t& size) {
  218 + int fd=-1;
  219 + int prot=PROT_READ;
  220 + inProcessMappingLock.lock();
  221 + if (inProcessMapping.find(m_name)!=inProcessMapping.end()){
  222 + perror("ERROR: SHM object already created");
  223 + inProcessMappingLock.unlock();
  224 + return false;
  225 + }
  226 +
  227 + m_size=size;
  228 + shm_unlink(m_name.c_str());
  229 + while (1) {
  230 + fd=shm_open(("/"+m_name).c_str(), m_flags | O_CREAT | O_EXCL, m_mode);
  231 + if (fd == -1)
  232 + break;
  233 + if (ftruncate(fd, size) == -1)
  234 + break;
  235 + if (m_flags|O_RDWR)
  236 + prot|=PROT_WRITE;
  237 + m_data=mmap(NULL,m_size, prot, MAP_SHARED, fd, 0);
  238 + if (m_data == MAP_FAILED)
  239 + break;
  240 + close(fd);
  241 + memset(m_data,0,m_size);
  242 + inProcessMapping[m_name].obj=this;
  243 + ++inProcessMapping[m_name].count;
  244 + inProcessMapping[m_name].unlink=true;
  245 + inProcessMappingLock.unlock();
  246 + return true;
  247 + }
  248 + if (fd !=-1) {
  249 + close(fd);
  250 + fd=-1;
  251 + shm_unlink(("/"+m_name).c_str());
  252 + }
  253 + m_size=0;
  254 + m_data=NULL;
  255 + inProcessMappingLock.unlock();
  256 + return false;
  257 +}
  258 +
  259 +bool shm::attach(){
  260 + if (m_size != 0 || m_data != NULL) {
  261 + perror("ERROR: SHM object already initialized");
  262 + return false;
  263 + }
  264 + inProcessMappingLock.lock();
  265 + if (inProcessMapping.find(m_name)==inProcessMapping.end()){
  266 + int fd=-1;
  267 + int prot=PROT_READ;
  268 + struct stat buf;
  269 +
  270 + fd=shm_open(("/"+m_name).c_str(), m_flags, 0);
  271 + if (fd == -1) {
  272 + perror(strerror(errno));
  273 + inProcessMappingLock.unlock();
  274 + return false;;
  275 + }
  276 + if (fstat(fd, &buf) == -1) {
  277 + close(fd);
  278 + fd=-1;
  279 + perror(strerror(errno));
  280 + inProcessMappingLock.unlock();
  281 + return false;;
  282 + }
  283 + if (m_flags|O_RDWR)
  284 + prot|=PROT_WRITE;
  285 + m_size=buf.st_size;
  286 + m_data=mmap(NULL,m_size, prot, MAP_SHARED, fd, 0);
  287 + if (m_data == MAP_FAILED){
  288 + m_data=NULL;
  289 + close(fd);
  290 + fd=-1;
  291 + perror(strerror(errno));
  292 + inProcessMappingLock.unlock();
  293 + return false;;
  294 + }
  295 + close(fd);
  296 + fd=-1;
  297 + inProcessMapping[m_name].obj=this;
  298 + ++inProcessMapping[m_name].count;
  299 + inProcessMapping[m_name].unlink=false;
  300 + } else {
  301 + m_size=inProcessMapping[m_name].obj->m_size;
  302 + m_data=inProcessMapping[m_name].obj->m_data;
  303 + ++inProcessMapping[m_name].count;
  304 + }
  305 + inProcessMappingLock.unlock();
  306 + return true;
  307 +}
  308 +
  309 +bool shm::mlock() {
  310 + return (::mlock(m_data, m_size) != -1);
  311 +}
  312 +
  313 +void shm::munlock() {
  314 + if (::munlock(m_data, m_size) == -1)
  315 + throw; //TODO
  316 +}
... ...
src/shm.h 0 → 100644
  1 +++ a/src/shm.h
  1 +/*
  2 + * shm.h
  3 + *
  4 + * Created on: 15 gru 2018
  5 + * Author: mariuszo
  6 + */
  7 +
  8 +#ifndef SHM_H_
  9 +#define SHM_H_
  10 +
  11 +#include <string>
  12 +#include <map>
  13 +#include <mutex>
  14 +#include <fcntl.h>
  15 +/*
  16 +#include <typeinfo>
  17 +*/
  18 +class shm {
  19 +protected:
  20 + struct shmInUse {
  21 + shm* obj;
  22 + uint32_t count;
  23 + bool unlink;
  24 + shmInUse():obj(NULL),count(0),unlink(false){}
  25 + };
  26 + static std::map<std::string, shmInUse> inProcessMapping;
  27 + static std::recursive_mutex inProcessMappingLock;
  28 + const std::string m_name;
  29 + int m_flags;
  30 + mode_t m_mode;
  31 + void* m_data;
  32 + size_t m_size;
  33 +public:
  34 + shm(const std::string& name, int flags=O_RDWR, mode_t mode=S_IRUSR|S_IWUSR);
  35 + virtual ~shm() {free();}
  36 +
  37 + virtual bool alloc(const size_t& size);
  38 + virtual bool attach();
  39 + void unlink();
  40 + void* getAddr(const off_t& offset=0) const {return (uint8_t*)m_data+offset;}
  41 + template <typename T> T* getAddr(const off_t& offset=0) const {return (T*)getAddr(offset*sizeof(T));}
  42 + size_t getSize() const {return m_size;}
  43 + const std::string& getName() const {return m_name;}
  44 + virtual bool mlock();
  45 + virtual void munlock();
  46 + virtual void free();
  47 + friend class shmLog;
  48 + friend class shmTimeLog;
  49 +};
  50 +
  51 +class shmRing : public shm {
  52 +protected:
  53 + void* m_placeholder;
  54 + size_t m_placeholderSize;
  55 + void* m_data2;
  56 +public:
  57 + shmRing(const std::string& name, int flags=O_RDWR, mode_t mode=S_IRUSR|S_IWUSR);
  58 + virtual ~shmRing() {}
  59 +
  60 + virtual bool alloc(const size_t& size);
  61 + virtual bool attach();
  62 + virtual bool mlock();
  63 + virtual void munlock();
  64 + virtual void free();
  65 + friend class shmLog;
  66 + friend class shmTimeLog;
  67 +};
  68 +
  69 +template <typename T> class shm_t:private shm {
  70 + size_t elements;
  71 +public:
  72 + shm_t(const std::string& name, int flags=O_RDWR, mode_t mode=S_IRUSR|S_IWUSR):shm(name, flags, mode), elements(0){}
  73 + void unlink() { shm::unlink();}
  74 + bool alloc(const size_t& size){
  75 + elements=size;
  76 + if (!shm::alloc(size*sizeof(T)+sizeof(size_t)))
  77 + return false;
  78 + *((size_t*)m_data)=typeid(T).hash_code();
  79 + return true;
  80 + }
  81 + bool attach(){
  82 + if (!shm::attach())
  83 + return false;
  84 + if (*((size_t*)m_data)!=typeid(T).hash_code())
  85 + return false;
  86 + elements=m_size/sizeof(T);
  87 + return true;
  88 + }
  89 + T* getAddr(const off_t& offset=0) const {return (T*)shm::getAddr(offset*sizeof(T)+sizeof(size_t));}
  90 + const size_t& getSize() const {return elements;}
  91 + bool mlock(){return shm::mlock();}
  92 + void munlock(){shm::munlock();}
  93 + void free(){shm::free();}
  94 + T* operator->() const {return getAddr();}
  95 + T& operator[](const off_t& id) const {return *getAddr(id);}
  96 + friend class shmLog;
  97 + friend class shmTimeLog;
  98 +};
  99 +
  100 +#endif /* SHM_H_ */
... ...
src/wrflock.h 0 → 100644
  1 +++ a/src/wrflock.h
  1 +#ifndef _WRFLOCK_H
  2 +#define _WRFLOCK_H 1
  3 +
  4 +#include <atomic>
  5 +#include <linux/futex.h>
  6 +
  7 +#define WRFLOCK_WWAITBLOCK 1
  8 +#define WRFLOCK_WWAITYIELD 2
  9 +#define WRFLOCK_RWAITBLOCK 4
  10 +#define WRFLOCK_RWAITYIELD 8
  11 +#define WRFLOCK_FWAITBLOCK 16
  12 +#define WRFLOCK_FWAITYIELD 32
  13 +struct wrflock_t
  14 +{
  15 +# if __BYTE_ORDER == __LITTLE_ENDIAN
  16 +# define WRFLOCK_COUNTERS_OFFSET 0
  17 +# define WRFLOCK_STATE_OFFSET 1
  18 +# define MAKEFLAGS64(flags, le) (le?((((uint64_t)(flags))<<32)):(((uint64_t)(flags))))
  19 +# define MAKESHIFT64(shift, le) (le?(shift+32):(shift))
  20 +# elif __BYTE_ORDER == __BIG_ENDIAN
  21 +# define WRFLOCK_COUNTERS_OFFSET 1
  22 +# define WRFLOCK_STATE_OFFSET 0
  23 +# define MAKEFLAGS64(flags, le) (le?((((uint64_t)(flags)))):((((uint64_t)(flags))<<32)))
  24 +# define MAKESHIFT64(shift, le) (le?(shift):(shift+32))
  25 +# else
  26 +# error Unsupported byte order.
  27 +# endif
  28 +
  29 +# define WRFLOCK_PRIVATE_MASK_32 ((uint32_t)0x04000000)
  30 +# define WRFLOCK_PRIVATE_MASK_64 (MAKEFLAGS64(WRFLOCK_PRIVATE_MASK_32,0) | MAKEFLAGS64(WRFLOCK_PRIVATE_MASK_32,1))
  31 +# define WRFLOCK_WWAITYIELD_MASK_32 ((uint32_t)0x00010000)
  32 +# define WRFLOCK_WWAITYIELD_MASK_64 MAKEFLAGS64(WRFLOCK_WWAITYIELD_MASK_32,1)
  33 +# define WRFLOCK_RWAITYIELD_MASK_32 ((uint32_t)0x00020000)
  34 +# define WRFLOCK_RWAITYIELD_MASK_64 MAKEFLAGS64(WRFLOCK_RWAITYIELD_MASK_32,1)
  35 +# define WRFLOCK_FWAITYIELD_MASK_32 ((uint32_t)0x00040000)
  36 +# define WRFLOCK_FWAITYIELD_MASK_64 MAKEFLAGS64(WRFLOCK_FWAITYIELD_MASK_32,1)
  37 +
  38 +# define WRACQUIRE_VALUE_SHIFT_32 28
  39 +# define RDACQUIRE_VALUE_SHIFT_32 29
  40 +# define FRACQUIRE_VALUE_SHIFT_32 30
  41 +# define RDACQUIRE_COUNTER_SHIFT_32 0
  42 +# define WRACQUIRE_VALUE_SHIFT_64 MAKESHIFT64(WRACQUIRE_VALUE_SHIFT_32,1)
  43 +# define RDACQUIRE_VALUE_SHIFT_64 MAKESHIFT64(RDACQUIRE_VALUE_SHIFT_32,1)
  44 +# define FRACQUIRE_VALUE_SHIFT_64 MAKESHIFT64(FRACQUIRE_VALUE_SHIFT_32,1)
  45 +# define RDACQUIRE_COUNTER_SHIFT_64 MAKESHIFT64(RDACQUIRE_COUNTER_SHIFT_32,0)
  46 +
  47 +# define WRACQUIRE_VALUE_MASK_32 ((uint32_t)0x10000000)
  48 +# define RDACQUIRE_VALUE_MASK_32 ((uint32_t)0x20000000)
  49 +# define FRACQUIRE_VALUE_MASK_32 ((uint32_t)0x40000000)
  50 +# define RDACQUIRE_COUNTER_MASK_32 ((uint32_t)0x0000FFFF)
  51 +# define RDNXTLOOP_FLAG_MASK_32 ((uint32_t)0x02000000)
  52 +# define WRACQUIRE_VALUE_MASK_64 MAKEFLAGS64(WRACQUIRE_VALUE_MASK_32,1)
  53 +# define RDACQUIRE_VALUE_MASK_64 MAKEFLAGS64(RDACQUIRE_VALUE_MASK_32,1)
  54 +# define FRACQUIRE_VALUE_MASK_64 MAKEFLAGS64(FRACQUIRE_VALUE_MASK_32,1)
  55 +# define RDACQUIRE_COUNTER_MASK_64 MAKEFLAGS64(RDACQUIRE_COUNTER_MASK_32,0)
  56 +# define RDNXTLOOP_FLAG_MASK_64 MAKEFLAGS64(RDNXTLOOP_FLAG_MASK_32,1)
  57 +
  58 +
  59 +# define NEXTSTATE_WRITE_MASK_32 ((uint32_t)0x00000010)
  60 +# define NEXTSTATE_READFREE_MASK_32 ((uint32_t)0x00000020)
  61 +# define NEXTSTATE_VALUE_MASK_32 (NEXTSTATE_WRITE_MASK_32|NEXTSTATE_READFREE_MASK_32|NEXTSTATE_FREE_MASK_32)
  62 +# define NEXTSTATE_WRITE_MASK_64 MAKEFLAGS64(NEXTSTATE_WRITE_MASK_32,1)
  63 +# define NEXTSTATE_READFREE_MASK_64 MAKEFLAGS64(NEXTSTATE_READFREE_MASK_32,1)
  64 +# define NEXTSTATE_VALUE_MASK_64 (NEXTSTATE_WRITE_MASK_64|NEXTSTATE_READFREE_MASK_64|NEXTSTATE_FREE_MASK_64)
  65 +
  66 +# define CURRSTATE_WRITE_MASK_32 ((uint32_t)0x00000001)
  67 +# define CURRSTATE_READ_MASK_32 ((uint32_t)0x00000002)
  68 +# define CURRSTATE_FREE_MASK_32 ((uint32_t)0x00000004)
  69 +# define CURRSTATE_VALUE_MASK_32 (CURRSTATE_WRITE_MASK_32|CURRSTATE_READ_MASK_32|CURRSTATE_FREE_MASK_32)
  70 +# define CURRSTATE_WRITE_MASK_64 MAKEFLAGS64(CURRSTATE_WRITE_MASK_32,1)
  71 +# define CURRSTATE_READ_MASK_64 MAKEFLAGS64(CURRSTATE_READ_MASK_32,1)
  72 +# define CURRSTATE_FREE_MASK_64 MAKEFLAGS64(CURRSTATE_FREE_MASK_32,1)
  73 +# define CURRSTATE_VALUE_MASK_64 (CURRSTATE_WRITE_MASK_64|CURRSTATE_READ_MASK_64|CURRSTATE_FREE_MASK_64)
  74 +
  75 +
  76 + union {
  77 + std::atomic<uint64_t> data64;
  78 + std::atomic<uint32_t> data32[2];
  79 + };
  80 +};
  81 +
  82 +int wrflock_init (wrflock_t *lock, int waittype, int pshared);
  83 +int wrflock_destroy (wrflock_t *lock);
  84 +int wrflock_wacquire (wrflock_t *lock);
  85 +int wrflock_racquire (wrflock_t *lock);
  86 +int wrflock_facquire (wrflock_t *lock);
  87 +int wrflock_wrelease (wrflock_t *lock);
  88 +int wrflock_rrelease (wrflock_t *lock);
  89 +int wrflock_frelease (wrflock_t *lock);
  90 +int wrflock_wwait (wrflock_t *lock);
  91 +int wrflock_rwait (wrflock_t *lock);
  92 +int wrflock_fwait (wrflock_t *lock);
  93 +int wrflock_wtrywait (wrflock_t *lock);
  94 +int wrflock_rtrywait (wrflock_t *lock);
  95 +int wrflock_ftrywait (wrflock_t *lock);
  96 +int wrflock_wtimewait (wrflock_t *lock, const struct timespec *abstime);
  97 +int wrflock_rtimewait (wrflock_t *lock, const struct timespec *abstime);
  98 +int wrflock_ftimewait (wrflock_t *lock, const struct timespec *abstime);
  99 +
  100 +
  101 +#endif /* rmwlock.h */
... ...
src/wrflock_acquire.cpp 0 → 100644
  1 +++ a/src/wrflock_acquire.cpp
  1 +#include <errno.h>
  2 +#include <sched.h>
  3 +#include "futex.h"
  4 +#include "wrflock.h"
  5 +
  6 +int wrflock_wacquire (wrflock_t *lock) {
  7 + uint32_t newdata32;
  8 + uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  9 + do {
  10 + if (data32 & WRACQUIRE_VALUE_MASK_32) {
  11 + errno=(EOVERFLOW);
  12 + return -1;
  13 + }
  14 + newdata32=data32 | WRACQUIRE_VALUE_MASK_32;
  15 + if (newdata32 & FRACQUIRE_VALUE_MASK_32) { //writer reached free barier - new readers goes to next loop
  16 + newdata32|=RDNXTLOOP_FLAG_MASK_32;
  17 + }
  18 + if (newdata32 & NEXTSTATE_WRITE_MASK_32) { //no barier, can take write lock
  19 + newdata32=(newdata32 ^ (NEXTSTATE_WRITE_MASK_32 | CURRSTATE_WRITE_MASK_32));
  20 + }
  21 + } while (!lock->data32[WRFLOCK_STATE_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_relaxed, std::memory_order_relaxed));
  22 + return 0;
  23 +}
  24 +
  25 +int wrflock_racquire (wrflock_t *lock) {
  26 + uint32_t newdata32;
  27 + uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  28 + while (data32 & RDNXTLOOP_FLAG_MASK_32) {
  29 + if (data32 & WRFLOCK_RWAITYIELD_MASK_32)
  30 + sched_yield();
  31 + else
  32 + futex_wait_bitset((int*)(&lock->data32[WRFLOCK_STATE_OFFSET]), data32, NULL, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  33 + data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  34 + }
  35 + do {
  36 + if ((data32 & RDACQUIRE_COUNTER_MASK_32) == RDACQUIRE_COUNTER_MASK_32) {
  37 + errno=(EOVERFLOW);
  38 + return -1;
  39 + }
  40 + newdata32 = data32 + ((uint32_t)1 << RDACQUIRE_COUNTER_SHIFT_32);
  41 + } while (!lock->data32[WRFLOCK_COUNTERS_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_relaxed, std::memory_order_relaxed));
  42 + data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  43 + do {
  44 + newdata32 = data32 | RDACQUIRE_VALUE_MASK_32;
  45 + if (newdata32 & NEXTSTATE_READFREE_MASK_32) {
  46 + newdata32 = (newdata32 ^ (NEXTSTATE_READFREE_MASK_32 | CURRSTATE_READ_MASK_32));
  47 + }
  48 + } while (!lock->data32[WRFLOCK_STATE_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_relaxed, std::memory_order_relaxed));
  49 + return 0;
  50 +}
  51 +
  52 +int wrflock_facquire (wrflock_t *lock) {
  53 + uint32_t newdata32;
  54 + uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  55 + do {
  56 + if (data32 & FRACQUIRE_VALUE_MASK_32) {
  57 + errno=(EOVERFLOW);
  58 + return -1;
  59 + }
  60 + newdata32 = data32 | FRACQUIRE_VALUE_MASK_32;
  61 + if (newdata32 & NEXTSTATE_READFREE_MASK_32) {
  62 + newdata32 = (newdata32 ^ (NEXTSTATE_READFREE_MASK_32 | CURRSTATE_FREE_MASK_32));
  63 + }
  64 + } while (!lock->data32[WRFLOCK_STATE_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_relaxed, std::memory_order_relaxed));
  65 + return 0;
  66 +}
... ...
src/wrflock_destroy.cpp 0 → 100644
  1 +++ a/src/wrflock_destroy.cpp
  1 +#include "wrflock.h"
  2 +
  3 +int wrflock_destroy (wrflock_t *lock) {
  4 + return 0;
  5 +}
... ...
src/wrflock_init.cpp 0 → 100644
  1 +++ a/src/wrflock_init.cpp
  1 +#include "wrflock.h"
  2 +
  3 +int wrflock_init (wrflock_t *lock, int waittype, int pshared) {
  4 + lock->data64 = (pshared==0?WRFLOCK_PRIVATE_MASK_64:0) | NEXTSTATE_WRITE_MASK_64;
  5 + lock->data64 |= (waittype&WRFLOCK_WWAITYIELD)?WRFLOCK_WWAITYIELD_MASK_64:0;
  6 + lock->data64 |= (waittype&WRFLOCK_RWAITYIELD)?WRFLOCK_RWAITYIELD_MASK_64:0;
  7 + lock->data64 |= (waittype&WRFLOCK_FWAITYIELD)?WRFLOCK_FWAITYIELD_MASK_64:0;
  8 + return 0;
  9 +}
... ...
src/wrflock_release.cpp 0 → 100644
  1 +++ a/src/wrflock_release.cpp
  1 +#include <errno.h>
  2 +#include <limits.h>
  3 +
  4 +#include "futex.h"
  5 +#include "wrflock.h"
  6 +#include <stdio.h>
  7 +
  8 +int wrflock_wrelease (wrflock_t *lock) {
  9 + uint32_t newdata32;
  10 + uint32_t data32=lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  11 + do {
  12 + if (!(data32 & WRACQUIRE_VALUE_MASK_32)) {
  13 + errno=(EOVERFLOW);
  14 + return -1;
  15 + }
  16 + newdata32=data32 & (~(WRACQUIRE_VALUE_MASK_32|CURRSTATE_WRITE_MASK_32|RDNXTLOOP_FLAG_MASK_32));
  17 + if (newdata32 & RDACQUIRE_VALUE_MASK_32) {
  18 + newdata32|=CURRSTATE_READ_MASK_32;
  19 + } else if (newdata32 & FRACQUIRE_VALUE_MASK_32) {
  20 + newdata32|=CURRSTATE_FREE_MASK_32;
  21 + } else {
  22 + newdata32|=NEXTSTATE_READFREE_MASK_32;
  23 + }
  24 + } while (!lock->data32[WRFLOCK_STATE_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_release, std::memory_order_relaxed));
  25 + if ((!(newdata32&WRFLOCK_RWAITYIELD_MASK_32) && (newdata32&(CURRSTATE_READ_MASK_32|RDNXTLOOP_FLAG_MASK_32))) || (!(newdata32&WRFLOCK_FWAITYIELD_MASK_32) && newdata32&CURRSTATE_FREE_MASK_32))
  26 + return futex_wake_bitset((int *) &lock->data32[WRFLOCK_STATE_OFFSET], INT_MAX, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  27 + return 0;
  28 +}
  29 +
  30 +
  31 +int wrflock_rrelease (wrflock_t *lock) {
  32 + uint64_t newdata64;
  33 + uint64_t data64=lock->data64.load(std::memory_order_relaxed);
  34 + do {
  35 + if (!(data64 & RDACQUIRE_COUNTER_MASK_64)) {
  36 + errno=(EOVERFLOW);
  37 + return -1;
  38 + }
  39 + newdata64=data64 - ((uint64_t)1 << RDACQUIRE_COUNTER_SHIFT_64);
  40 + if (!(newdata64 & RDACQUIRE_COUNTER_MASK_64)) {
  41 + newdata64=newdata64 & (~(RDACQUIRE_VALUE_MASK_64));
  42 + if (newdata64 & FRACQUIRE_VALUE_MASK_64)
  43 + newdata64=(newdata64 ^ (CURRSTATE_READ_MASK_64 | CURRSTATE_FREE_MASK_64));
  44 + else
  45 + newdata64=(newdata64 ^ (CURRSTATE_READ_MASK_64 | NEXTSTATE_READFREE_MASK_64));
  46 + }
  47 + } while (!lock->data64.compare_exchange_weak(data64, newdata64, std::memory_order_release, std::memory_order_relaxed));
  48 + if (!(newdata64&WRFLOCK_FWAITYIELD_MASK_64) && newdata64&CURRSTATE_FREE_MASK_64)
  49 + return futex_wake_bitset((int *) &lock->data32[WRFLOCK_STATE_OFFSET], INT_MAX, FUTEX_BITSET_MATCH_ANY, (data64 & WRFLOCK_PRIVATE_MASK_64)!=0);
  50 + return 0;
  51 +}
  52 +
  53 +int wrflock_frelease (wrflock_t *lock) {
  54 + uint32_t newdata32;
  55 + uint32_t data32=lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  56 + do {
  57 + if (!(data32 & FRACQUIRE_VALUE_MASK_32)) {
  58 + errno=(EOVERFLOW);
  59 + return -1;
  60 + }
  61 + newdata32=data32 & (~(FRACQUIRE_VALUE_MASK_32|CURRSTATE_FREE_MASK_32));
  62 + if (newdata32 & WRACQUIRE_VALUE_MASK_32) {
  63 + newdata32|=CURRSTATE_WRITE_MASK_32;
  64 + } else {
  65 + newdata32|=NEXTSTATE_WRITE_MASK_32;
  66 + }
  67 + } while (!lock->data32[WRFLOCK_STATE_OFFSET].compare_exchange_weak(data32, newdata32, std::memory_order_release, std::memory_order_relaxed));
  68 + if (!(newdata32&WRFLOCK_WWAITYIELD_MASK_32) && newdata32&CURRSTATE_WRITE_MASK_32)
  69 + return futex_wake_bitset((int *) &lock->data32[WRFLOCK_STATE_OFFSET], INT_MAX, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  70 + return 0;
  71 +}
  72 +
... ...
src/wrflock_wait.cpp 0 → 100644
  1 +++ a/src/wrflock_wait.cpp
  1 +#include <sched.h>
  2 +#include <errno.h>
  3 +#include "futex.h"
  4 +#include "wrflock.h"
  5 +#include <stdio.h>
  6 +
  7 +
  8 +int wrflock_wtimewait (struct wrflock_t *lock, const struct timespec *abstime)
  9 +{
  10 + uint32_t data32;
  11 + int err;
  12 + if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) {
  13 + errno=(EINVAL);
  14 + return -1;
  15 + }
  16 + while (1) {
  17 + data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  18 + if (data32 & CURRSTATE_WRITE_MASK_32) {
  19 + std::atomic_thread_fence(std::memory_order_acquire);
  20 + return 0;
  21 + }
  22 + if (!(data32&WRFLOCK_WWAITYIELD_MASK_32)){
  23 + err=futex_wait_bitset((int*)(&lock->data32[WRFLOCK_STATE_OFFSET]), data32, abstime, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  24 + if (err<0 && (errno == ETIMEDOUT || errno == EINTR))
  25 + return err;
  26 + } else {
  27 + if (abstime){
  28 + timespec time;
  29 + clock_gettime(CLOCK_REALTIME, &time);
  30 + if ((time.tv_sec>abstime->tv_sec) || (time.tv_sec==abstime->tv_sec && time.tv_nsec>=abstime->tv_nsec)) {
  31 + errno=ETIMEDOUT;
  32 + return -1;
  33 + }
  34 + }
  35 + sched_yield();
  36 + }
  37 + }
  38 +}
  39 +
  40 +int wrflock_rtimewait (struct wrflock_t *lock, const struct timespec *abstime)
  41 +{
  42 + uint32_t data32;
  43 + int err;
  44 + if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) {
  45 + errno=(EINVAL);
  46 + return -1;
  47 + }
  48 + while (1) {
  49 + data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  50 + if (data32 & CURRSTATE_READ_MASK_32){
  51 + std::atomic_thread_fence(std::memory_order_acquire);
  52 + return 0;
  53 + }
  54 + if (!(data32&WRFLOCK_RWAITYIELD_MASK_32)){
  55 + err=futex_wait_bitset((int*)(&lock->data32[WRFLOCK_STATE_OFFSET]), data32, abstime, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  56 + if (err<0 && (errno == ETIMEDOUT || errno == EINTR))
  57 + return err;
  58 + } else {
  59 + if (abstime){
  60 + timespec time;
  61 + clock_gettime(CLOCK_REALTIME, &time);
  62 + if ((time.tv_sec>abstime->tv_sec) || (time.tv_sec==abstime->tv_sec && time.tv_nsec>=abstime->tv_nsec)){
  63 + errno=ETIMEDOUT;
  64 + return -1;
  65 + }
  66 + }
  67 + sched_yield();
  68 + }
  69 + }
  70 +}
  71 +
  72 +int wrflock_ftimewait (struct wrflock_t *lock, const struct timespec *abstime)
  73 +{
  74 + uint32_t data32;
  75 + int err=0;
  76 + if (abstime && (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000)) {
  77 + errno=(EINVAL);
  78 + return -1;
  79 + }
  80 + while (1) {
  81 + data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_relaxed);
  82 + if (data32 & CURRSTATE_FREE_MASK_32) {
  83 + std::atomic_thread_fence(std::memory_order_acquire);
  84 + return 0;
  85 + }
  86 + if (!(data32&WRFLOCK_FWAITYIELD_MASK_32)) {
  87 + err=futex_wait_bitset((int*)(&lock->data32[WRFLOCK_STATE_OFFSET]), data32, abstime, FUTEX_BITSET_MATCH_ANY, (data32 & WRFLOCK_PRIVATE_MASK_32)!=0);
  88 + if (err<0 && (errno == ETIMEDOUT || errno == EINTR))
  89 + return err;
  90 + } else {
  91 + if (abstime){
  92 + timespec time;
  93 + clock_gettime(CLOCK_REALTIME, &time);
  94 + if ((time.tv_sec>abstime->tv_sec) || (time.tv_sec==abstime->tv_sec && time.tv_nsec>=abstime->tv_nsec)) {
  95 + errno=ETIMEDOUT;
  96 + return -1;
  97 + }
  98 + }
  99 + sched_yield();
  100 + }
  101 + }
  102 +}
  103 +
  104 +int wrflock_wwait (wrflock_t *lock){
  105 + return wrflock_wtimewait (lock, NULL);
  106 +}
  107 +
  108 +int wrflock_rwait (wrflock_t *lock){
  109 + return wrflock_rtimewait (lock, NULL);
  110 +}
  111 +
  112 +int wrflock_fwait (wrflock_t *lock){
  113 + return wrflock_ftimewait (lock, NULL);
  114 +}
  115 +
  116 +int wrflock_wtrywait (wrflock_t *lock){
  117 + const uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_acquire);
  118 + if (!(data32 & CURRSTATE_WRITE_MASK_32)){
  119 + errno=(EAGAIN);
  120 + return -1;
  121 + }
  122 + return 0;
  123 +}
  124 +
  125 +int wrflock_rtrywait (wrflock_t *lock){
  126 + const uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_acquire);
  127 + if (!(data32 & CURRSTATE_READ_MASK_32)){
  128 + errno=(EAGAIN);
  129 + return -1;
  130 + }
  131 + return 0;
  132 +}
  133 +
  134 +int wrflock_ftrywait (wrflock_t *lock){
  135 + const uint32_t data32 = lock->data32[WRFLOCK_STATE_OFFSET].load(std::memory_order_acquire);
  136 + if (!(data32 & CURRSTATE_FREE_MASK_32)){
  137 + errno=(EAGAIN);
  138 + return -1;
  139 + }
  140 + return 0;
  141 +}
  142 +
... ...
test.cpp 0 → 100644
  1 +++ a/test.cpp
  1 +#include <sched.h>
  2 +#include <chrono>
  3 +#include <vector>
  4 +#include <algorithm>
  5 +#include <iostream>
  6 +#include <thread>
  7 +#include "Consumer.h"
  8 +#include "Producer.h"
  9 +
  10 +
  11 +#define LOOPS 10000000
  12 +//0
  13 +#define CONSUMERS 2
  14 +#define NSPS 1000000000
  15 +
  16 +Producer p("test");
  17 +
  18 +using namespace std;
  19 +void testProducerTask();
  20 +void testConsumerTask();
  21 +
  22 +std::atomic<bool> stopc(false);
  23 +std::atomic<uint32_t> cid(0);
  24 +
  25 +std::vector<double> cltime[CONSUMERS];
  26 +
  27 +int main() {
  28 + double min,max,avg;
  29 + sched_param sch_params;
  30 + sch_params.sched_priority=1;
  31 + if (pthread_setschedparam(pthread_self(),SCHED_FIFO,&sch_params)!=0)
  32 + std::cout<<"Error priority"<<std::endl;
  33 +
  34 + thread testProducerThread;
  35 + thread testConsumerThread[CONSUMERS];
  36 +
  37 + p.create(1000000, 20000);
  38 +
  39 + testProducerThread=thread(testProducerTask);
  40 + for (uint32_t i=0;i<CONSUMERS;i++)
  41 + testConsumerThread[i]=thread(testConsumerTask);
  42 + testProducerThread.join();
  43 + for (uint32_t i=0;i<CONSUMERS;i++)
  44 + testConsumerThread[i].join();
  45 +
  46 + for (uint32_t i=0;i<CONSUMERS;i++){
  47 + min=2e9;
  48 + max=avg=0;
  49 + sort(cltime[i].begin(),cltime[i].end());
  50 + for (uint64_t t=0;t<LOOPS;t++){
  51 + cltime[i][t]-=157e-9;
  52 + if (cltime[i][t]<min)
  53 + min=cltime[i][t];
  54 + if (cltime[i][t]>max)
  55 + max=cltime[i][t];
  56 + avg+=cltime[i][t];
  57 + }
  58 + cout<<"MIN: "<<min<<endl;
  59 + cout<<"MAX: "<<max<<endl;
  60 + cout<<"AVG: "<<avg/LOOPS<<endl;
  61 + cout<<"99%: "<<cltime[i][(off_t)((double)LOOPS*0.99)]<<endl;
  62 + }
  63 + return 0;
  64 +}
  65 +
  66 +void testProducerTask(){
  67 + std::chrono::high_resolution_clock::time_point t[2];
  68 + DataBlock b;
  69 +
  70 + p.init();
  71 +
  72 + sleep(2);
  73 +
  74 + cout<<"VALIDATION/WARMUP"<<endl;
  75 + for (int64_t i=0; i<LOOPS; ++i){
  76 + do {
  77 + p.allocateDataBlock(b, 100);
  78 + } while (!b);
  79 + ((uint32_t*)(*b))[0]=i;
  80 + p.commit(b);
  81 + }
  82 + cout<<"TIMINGS"<<endl;
  83 + for (int64_t i=0; i<LOOPS; ++i){
  84 + do {
  85 + p.allocateDataBlock(b, 100);
  86 + } while (!b);
  87 + ((std::chrono::high_resolution_clock::time_point*)(*b))[0]=std::chrono::high_resolution_clock::now();
  88 + p.commit(b);
  89 + }
  90 + cout<<"SPEED"<<endl;
  91 + t[0]=std::chrono::high_resolution_clock::now();
  92 + for (int64_t i=0; i<LOOPS; ++i){
  93 + do {
  94 + p.allocateDataBlock(b, 100);
  95 + } while (!b);
  96 + p.commit(b);
  97 + }
  98 + t[1]=std::chrono::high_resolution_clock::now();
  99 + p.done();
  100 + double time=std::chrono::duration_cast<std::chrono::duration<double> >(t[1]-t[0]).count();
  101 + cout<<"Time: "<<time<<", OPps: "<<LOOPS/time<<endl;
  102 +}
  103 +
  104 +void testConsumerTask(){
  105 + int clientid;
  106 + std::chrono::high_resolution_clock::time_point ts;
  107 + DataBlock b;
  108 + Consumer c("test");
  109 + int64_t i=-LOOPS;
  110 + clientid=cid++;
  111 + std::vector<double>& time=cltime[clientid];
  112 + time.reserve(LOOPS);
  113 + for (int64_t i=0; i<LOOPS; ++i){
  114 + time[i]=0.0;
  115 + }
  116 + c.attach();
  117 + cout<<"Consumer "<<clientid<<endl;
  118 + c.init();
  119 + while(1){
  120 + c.getDataBlock(b);
  121 + if (b.isStop())
  122 + break;
  123 + if (!b)
  124 + continue;
  125 + ts=std::chrono::high_resolution_clock::now();
  126 + if (i<0){
  127 + if (((uint32_t*)(*b))[0]!=i+LOOPS)
  128 + cout<<"MISMATCH "<<((uint32_t*)(*b))[0]<<" "<<i+LOOPS<<endl;
  129 + }
  130 + else if (i>=0 && i<LOOPS){
  131 + time[i]=std::chrono::duration_cast<std::chrono::duration<double> >(ts-((std::chrono::high_resolution_clock::time_point*)(*b))[0]).count();
  132 + }
  133 + c.commit(b);
  134 + ++i;
  135 + }
  136 + c.done();
  137 +}
... ...