MemoryManager.cpp 8.41 KB
/*
 * MemoryManager.cpp
 *
 *  Created on: 15 gru 2018
 *      Author: mariuszo
 */

#include <limits.h>
#include "MemoryManager.h"
#include "pthread_helpers.h"


MemoryManagerBase::MemoryManagerBase(const std::string& name): m_name(name), m_shmDataRingBuffer(name+".ringbuffer"), m_shmMemoryManagementData(name+".memorymanagement"), m_dataRingBuffer(NULL), m_dataSlotsRingBuffer(NULL), m_memoryManagementData(NULL){
}

MemoryManagerBase::~MemoryManagerBase(){
}

void MemoryManagerBase::create(const size_t& dataRingBufferSize, const size_t& dataSlotsRingBufferSize) {
	size_t size;

	if (dataSlotsRingBufferSize > SEM_VALUE_MAX)
		throw std::logic_error("Data block queue size must not exceed "+std::to_string(SEM_VALUE_MAX));
	if (dataSlotsRingBufferSize <= 0)
		throw std::logic_error("Data block queue size must be positive value");

	m_dataSlotIDMask=  0xC000000000000000;
	m_dataSlotAddrMask=0x3fffffffffffffff;
	m_dataSlotIDShift=62;
	while (dataSlotsRingBufferSize>(((uint64_t)1)<<(63-m_dataSlotIDShift))){
		--m_dataSlotIDShift;
		m_dataSlotIDMask=(m_dataSlotIDMask>>1)+0x8000000000000000;
		m_dataSlotAddrMask=(m_dataSlotAddrMask>>1);
	}
	if (dataRingBufferSize>m_dataSlotAddrMask)
		throw std::logic_error("Cannot manage addressing for current buffer size and block count");


	if (!m_shmDataRingBuffer.alloc((size=dataRingBufferSize * sizeof(*m_dataRingBuffer))))
		throw std::logic_error("Cannot allocate shared memory \""+m_shmDataRingBuffer.getName()+"\" size ("+std::to_string(size)+")");
	if (!m_shmDataRingBuffer.mlock())
		throw std::logic_error("Cannot mlock shared memory \""+m_shmDataRingBuffer.getName()+"\"");

	if (!m_shmMemoryManagementData.alloc((size=sizeof(*m_memoryManagementData) + dataSlotsRingBufferSize * sizeof(*m_dataSlotsRingBuffer))))
		throw std::logic_error("Cannot allocate shared memory \""+m_shmMemoryManagementData.getName()+"\" (size "+std::to_string(size)+")");
	if (!m_shmMemoryManagementData.mlock())
		throw std::logic_error("Cannot mlock shared memory \""+m_shmMemoryManagementData.getName()+"\"");

	m_dataRingBuffer = (uint8_t*) (m_shmDataRingBuffer.getAddr());

	m_memoryManagementData = (memoryManagementData*) m_shmMemoryManagementData.getAddr();
	m_memoryManagementData->dataRingBufferSize = m_shmDataRingBuffer.getSize();
	m_memoryManagementData->dataSlotsRingBufferSize = dataSlotsRingBufferSize;
#ifdef FULLYPARALLEL
	new (&(m_memoryManagementData->allocData)) std::atomic<uint64_t>(0);
	new (&(m_memoryManagementData->releaseData)) std::atomic<uint64_t>(0);
#else
	m_memoryManagementData->allocData=0;
	m_memoryManagementData->releaseData=0;
#endif
	new (&(m_memoryManagementData->freeSize)) std::atomic<size_t>(m_shmDataRingBuffer.getSize());
	sem_init(&m_memoryManagementData->dataSlotsSemaphore, 0, m_memoryManagementData->dataSlotsRingBufferSize);

	m_dataSlotsRingBuffer = (dataSlot*) (m_memoryManagementData + 1);
	for (size_t i = 0; i < m_memoryManagementData->dataSlotsRingBufferSize; ++i) {
		new (&(m_dataSlotsRingBuffer[i].dataSlotID)) std::atomic<off_t>(static_cast<off_t>(block_type::block_empty));
		m_dataSlotsRingBuffer[i].dataAddr = 0;
		m_dataSlotsRingBuffer[i].dataSize = 0;
	}

}

void MemoryManagerBase::attach(){
	if (!m_shmMemoryManagementData.attach())
		throw std::logic_error("Cannot attach to shared memory \""+m_shmMemoryManagementData.getName()+"\"");

	if (!m_shmDataRingBuffer.attach())
		throw std::logic_error("Cannot attach to shared memory \""+m_shmDataRingBuffer.getName()+"\"");

	m_memoryManagementData=(memoryManagementData*)m_shmMemoryManagementData.getAddr();
	m_dataSlotsRingBuffer=(dataSlot*)(m_memoryManagementData+1);

	m_dataRingBuffer=(uint8_t*)(m_shmDataRingBuffer.getAddr());

	m_dataSlotIDMask=  0xC000000000000000;
	m_dataSlotAddrMask=0x3fffffffffffffff;
	m_dataSlotIDShift=62;
	while (m_memoryManagementData->dataSlotsRingBufferSize>(((uint64_t)1)<<(63-m_dataSlotIDShift))){
		--m_dataSlotIDShift;
		m_dataSlotIDMask=(m_dataSlotIDMask>>1)+0x8000000000000000;
		m_dataSlotAddrMask=(m_dataSlotAddrMask>>1);
	}
	if (m_memoryManagementData->dataRingBufferSize>m_dataSlotAddrMask)
		throw std::logic_error("Cannot manage addressing for current buffer size and block count");
}

void MemoryManagerBase::destroy(){
	for (size_t i = 0; i < m_memoryManagementData->dataSlotsRingBufferSize; ++i) {
		m_dataSlotsRingBuffer[i].dataSlotID.~atomic();
	}
	m_memoryManagementData->freeSize.~atomic();
#ifdef FULLYPARALLEL
	m_memoryManagementData->releaseData.~atomic();
	m_memoryManagementData->allocData.~atomic();
#endif
	m_shmMemoryManagementData.unlink();
	m_shmDataRingBuffer.unlink();
}

DataBlock MemoryManagerBase::alloc(const size_t& size, const uint64_t& timeoutns) {
	if (!semWait(m_memoryManagementData->dataSlotsSemaphore, timeoutns))
		return DataBlock(specialid_type::specialid_timeout);

	size_t freeSize=m_memoryManagementData->freeSize.load(std::memory_order_relaxed);
	do {
		if (freeSize<size){
			semPost(m_memoryManagementData->dataSlotsSemaphore);
			return DataBlock(specialid_type::specialid_outofmemory);
		}
	} while (!m_memoryManagementData->freeSize.compare_exchange_weak(freeSize, freeSize-size, std::memory_order_relaxed, std::memory_order_relaxed));

	uint64_t dataSlotID, newDataSlotID;
	uint64_t addr, newAddr;
#ifdef FULLYPARALLEL
	uint64_t allocData=m_memoryManagementData->allocData.load(std::memory_order_relaxed);
	do {
		newDataSlotID=(dataSlotID=(allocData>>m_dataSlotIDShift))+1;
		if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
			newDataSlotID=0;
		newAddr=(addr=(allocData&m_dataSlotAddrMask))+size;
		if (newAddr>=m_memoryManagementData->dataRingBufferSize)
			newAddr-=m_memoryManagementData->dataRingBufferSize;
	} while (!m_memoryManagementData->allocData.compare_exchange_weak(allocData, (newDataSlotID<<m_dataSlotIDShift) | newAddr, std::memory_order_relaxed, std::memory_order_relaxed));
#else
	uint64_t allocData=m_memoryManagementData->allocData;
	newDataSlotID=(dataSlotID=(allocData>>m_dataSlotIDShift))+1;
	if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
		newDataSlotID=0;
	newAddr=(addr=(allocData&m_dataSlotAddrMask))+size;
	if (newAddr>=m_memoryManagementData->dataRingBufferSize)
		newAddr-=m_memoryManagementData->dataRingBufferSize;

	m_memoryManagementData->allocData=(newDataSlotID<<m_dataSlotIDShift) | newAddr;
#endif

	dataSlot& dataSlot(m_dataSlotsRingBuffer[dataSlotID]);
#ifndef NDEBUG
	if (dataSlot.dataSlotID.load(std::memory_order_relaxed)!=static_cast<off_t>(block_type::block_empty))
		throw std::logic_error("Error in alloc() algorithm");
#endif
	dataSlot.dataAddr=addr;
	dataSlot.dataSize=size;
	dataSlot.dataSlotID.store(dataSlotID, std::memory_order_release);
	return DataBlock(dataSlot, m_dataRingBuffer);
}

void MemoryManagerBase::free(off_t dataSlotID) {
	uint64_t requiredReleaseData;
	uint64_t newDataSlotID;
	uint64_t newAddr;

	dataSlot* currentDataSlot=&(m_dataSlotsRingBuffer[dataSlotID]);
	currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_torelease), std::memory_order_relaxed);
#ifdef FULLYPARALLEL
	do {
#endif
		newDataSlotID=dataSlotID+1;
		if (newDataSlotID==m_memoryManagementData->dataSlotsRingBufferSize)
			newDataSlotID=0;
		newAddr=currentDataSlot->dataAddr+currentDataSlot->dataSize;
		requiredReleaseData=((dataSlotID<<m_dataSlotIDShift) | currentDataSlot->dataAddr);
#ifdef FULLYPARALLEL
		if (newAddr>=m_memoryManagementData->dataRingBufferSize)
			newAddr-=m_memoryManagementData->dataRingBufferSize;
		if (!m_memoryManagementData->releaseData.compare_exchange_strong(requiredReleaseData, (newDataSlotID<<m_dataSlotIDShift)|newAddr|0x8000000000000000, std::memory_order_relaxed, std::memory_order_relaxed))
			break;
		currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_empty), std::memory_order_relaxed);
		m_memoryManagementData->releaseData.fetch_and(~0x8000000000000000, std::memory_order_relaxed);
#else
#ifndef NDEBUG
		if (m_memoryManagementData->releaseData!=requiredReleaseData)
			throw std::logic_error("Error in free() algorithm");
#endif
		m_memoryManagementData->releaseData=(newDataSlotID<<m_dataSlotIDShift)|newAddr;
		currentDataSlot->dataSlotID.store(static_cast<off_t>(block_type::block_empty), std::memory_order_relaxed);
#endif
		m_memoryManagementData->freeSize.fetch_add(currentDataSlot->dataSize, std::memory_order_relaxed);
		semPost(m_memoryManagementData->dataSlotsSemaphore);
		currentDataSlot=&(m_dataSlotsRingBuffer[dataSlotID=newDataSlotID]);
#ifdef FULLYPARALLEL
	} while(currentDataSlot->dataSlotID.load(std::memory_order_relaxed)==static_cast<off_t>(block_type::block_torelease));
#endif
}