Consumer.cpp 1.92 KB
/*
 * Consumer.cpp
 *
 *  Created on: 15 gru 2018
 *      Author: mariuszo
 */

#include "Consumer.h"
#include "ProducerOnInitLock.h"

Consumer::Consumer(const std::string& name) : SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
}

Consumer::~Consumer() {
}

void Consumer::attach() {
	ProducerOnInitLock::lockOnInit();
	try {
		m_memoryManagerData.attach();
		m_synchronizerData.attach();
		if (!m_synchronizerData.consumersActiveLock())
			throw std::logic_error("Producer shut down activated");
	}
	catch (...) {
		ProducerOnInitLock::unlockOnInit();
		throw;
	}
	ProducerOnInitLock::unlockOnInit();
}

void Consumer::init() {
	m_synchronizerData.lockSynchronizerData();
	m_readSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());
	m_synchronizerData.racquire(m_readSlotID.val());
	m_synchronizerData.unlockSynchronizerData();
}
void Consumer::done() {
	m_synchronizerData.rrelease(m_readSlotID.val());
	m_synchronizerData.consumersActiveUnlock();
}

void Consumer::getDataBlock(DataBlock& dataBlock, const uint64_t& timeoutns) {
	if (!m_synchronizerData.rwait(m_readSlotID.val(), timeoutns)){
		dataBlock.setTimeout();
		return;
	}
	if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop)) {
		dataBlock.setStop();
		return;
	} else if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_empty)){
		dataBlock.setEmpty();
		return;
	} else {
		dataBlock=m_memoryManagerData.getDataBlock(m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID, m_readSlotID.val());
		dataBlock.attach(this);
	}
	m_synchronizerData.racquire(++m_readSlotID);
}
void Consumer::commit(DataBlock& data) {
	releaseDataBlock(data);
	data.detach();
	data.setEmpty();
}

void Consumer::releaseDataBlock(const DataBlock& data) {
	if (data){
		m_synchronizerData.rrelease(data.syncSlotID());
	}
}