Consumer.cpp
1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*
* Consumer.cpp
*
* Created on: 15 gru 2018
* Author: mariuszo
*/
#include "Consumer.h"
#include "ProducerOnInitLock.h"
Consumer::Consumer(const std::string& name) : SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
}
Consumer::~Consumer() {
}
void Consumer::attach() {
ProducerOnInitLock::lockOnInit();
try {
m_memoryManagerData.attach();
m_synchronizerData.attach();
if (!m_synchronizerData.consumersActiveLock())
throw std::logic_error("Producer shut down activated");
}
catch (...) {
ProducerOnInitLock::unlockOnInit();
throw;
}
ProducerOnInitLock::unlockOnInit();
}
void Consumer::init() {
m_synchronizerData.lockSynchronizerData();
m_readSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());
m_synchronizerData.racquire(m_readSlotID.val());
m_synchronizerData.unlockSynchronizerData();
}
void Consumer::done() {
m_synchronizerData.rrelease(m_readSlotID.val());
m_synchronizerData.consumersActiveUnlock();
}
void Consumer::getDataBlock(DataBlock& dataBlock, const uint64_t& timeoutns) {
if (!m_synchronizerData.rwait(m_readSlotID.val(), timeoutns)){
dataBlock.setTimeout();
return;
}
if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop)) {
dataBlock.setStop();
return;
} else if (m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_empty)){
dataBlock.setEmpty();
return;
} else {
dataBlock=m_memoryManagerData.getDataBlock(m_synchronizerData.getQueueSlot(m_readSlotID.val()).dataSlotID, m_readSlotID.val());
dataBlock.attach(this);
}
m_synchronizerData.racquire(++m_readSlotID);
}
void Consumer::commit(DataBlock& data) {
releaseDataBlock(data);
data.detach();
data.setEmpty();
}
void Consumer::releaseDataBlock(const DataBlock& data) {
if (data){
m_synchronizerData.rrelease(data.syncSlotID());
}
}