Producer.cpp 3.36 KB
#include <signal.h>
#include <string.h>
#include <pthread.h>

#include "Producer.h"
#include "ModuloCounter.h"
#include "ProducerOnInitLock.h"

std::recursive_mutex ProducerOnInitLock::lock;


Producer::Producer(const std::string& name) :
		SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
	ProducerOnInitLock::lockOnInit();
	sem_init(&m_bufferReleaseMonitorStarted, 0, 0);
}

Producer::~Producer() {
	ProducerOnInitLock::lockOnInit();
	sem_destroy(&m_bufferReleaseMonitorStarted);
	ProducerOnInitLock::unlockOnInit();
}

void Producer::create(const size_t& bufferSize, const size_t& dataSlotsRingBufferSize) {
	try {
		m_memoryManagerData.create(bufferSize, dataSlotsRingBufferSize);
		m_synchronizerData.create();
	}
	catch (...) {
		ProducerOnInitLock::unlockOnInit();
		throw;
	}
	ProducerOnInitLock::unlockOnInit();
}

void Producer::init() {
	m_synchronizerData.wacquire(m_synchronizerData.getWriteSlotID());
	m_bufferReleaseThread=std::thread(&Producer::bufferReleaseTask, this);
	semWait(m_bufferReleaseMonitorStarted);
	ProducerOnInitLock::unlockOnInit();
	m_synchronizerData.unlockSynchronizerData();
}
void Producer::done() {
	DataBlock dataBlock(specialid_type::specialid_stop);
	commit(dataBlock);
	m_synchronizerData.wrelease(m_synchronizerData.getWriteSlotID());
	if (m_bufferReleaseThread.joinable())
		m_bufferReleaseThread.join();
}

void Producer::allocateDataBlock(DataBlock& dataBlock, const size_t size, const uint64_t& timeoutns) {
	dataBlock=m_memoryManagerData.alloc(size, timeoutns);
	if (dataBlock)
		dataBlock.attach(this);
}
void Producer::releaseDataBlock(const DataBlock& data){
	m_memoryManagerData.free(data);
}

bool Producer::commit(DataBlock& dataBlock, const uint64_t& timeoutns) {
	const off_t writeSlotID=m_synchronizerData.getWriteSlotID();
	const off_t writeSlotIDNext=ModuloCounter<off_t>::next(writeSlotID, m_synchronizerData.slotsRingBufferSize());
	
	if (writeSlotID<0)
		return true;

	if (!m_synchronizerData.wwait(writeSlotID, timeoutns)) {
		pthread_yield();
		return false;
	}
	if (dataBlock.isStop()){
		m_synchronizerData.setQueueSlot(writeSlotID, static_cast<off_t>(specialid_type::specialid_stop));
	} else {
		m_synchronizerData.setQueueSlot(writeSlotID, dataBlock.blockID());
	}
	m_synchronizerData.wacquire(writeSlotIDNext);
	m_synchronizerData.lockSynchronizerData();
	m_synchronizerData.wrelease(writeSlotID);
	m_synchronizerData.setWriteSlotID(writeSlotIDNext);
	m_synchronizerData.unlockSynchronizerData();
	dataBlock.detach();
	dataBlock.setEmpty();
	return true;
}


void Producer::bufferReleaseTask() {
	off_t releaseDataSlotID;
	ModuloCounter<off_t> monitorSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());

	m_synchronizerData.facquire(monitorSlotID.val());
	semPost(m_bufferReleaseMonitorStarted);

	while (1) {
		m_synchronizerData.fwait(monitorSlotID.val());
		if (m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop))
			break;
		if ((releaseDataSlotID = m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID)>=0) {
			m_synchronizerData.setQueueSlot(monitorSlotID.val(), static_cast<off_t>(specialid_type::specialid_empty));
			m_memoryManagerData.free(releaseDataSlotID);
		}
		m_synchronizerData.facquire(monitorSlotID.next());
		m_synchronizerData.frelease(monitorSlotID++);
	}
	m_synchronizerData.frelease(monitorSlotID.val());
}