Producer.cpp
3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <signal.h>
#include <string.h>
#include <pthread.h>
#include "Producer.h"
#include "ModuloCounter.h"
#include "ProducerOnInitLock.h"
std::recursive_mutex ProducerOnInitLock::lock;
Producer::Producer(const std::string& name) :
SubsystemBase(name), m_memoryManagerData(m_name), m_synchronizerData(m_name) {
ProducerOnInitLock::lockOnInit();
sem_init(&m_bufferReleaseMonitorStarted, 0, 0);
}
Producer::~Producer() {
ProducerOnInitLock::lockOnInit();
sem_destroy(&m_bufferReleaseMonitorStarted);
ProducerOnInitLock::unlockOnInit();
}
void Producer::create(const size_t& bufferSize, const size_t& dataSlotsRingBufferSize) {
try {
m_memoryManagerData.create(bufferSize, dataSlotsRingBufferSize);
m_synchronizerData.create();
}
catch (...) {
ProducerOnInitLock::unlockOnInit();
throw;
}
ProducerOnInitLock::unlockOnInit();
}
void Producer::init() {
m_synchronizerData.wacquire(m_synchronizerData.getWriteSlotID());
m_bufferReleaseThread=std::thread(&Producer::bufferReleaseTask, this);
semWait(m_bufferReleaseMonitorStarted);
ProducerOnInitLock::unlockOnInit();
m_synchronizerData.unlockSynchronizerData();
}
void Producer::done() {
DataBlock dataBlock(specialid_type::specialid_stop);
commit(dataBlock);
m_synchronizerData.wrelease(m_synchronizerData.getWriteSlotID());
if (m_bufferReleaseThread.joinable())
m_bufferReleaseThread.join();
}
void Producer::allocateDataBlock(DataBlock& dataBlock, const size_t size, const uint64_t& timeoutns) {
dataBlock=m_memoryManagerData.alloc(size, timeoutns);
if (dataBlock)
dataBlock.attach(this);
}
void Producer::releaseDataBlock(const DataBlock& data){
m_memoryManagerData.free(data);
}
bool Producer::commit(DataBlock& dataBlock, const uint64_t& timeoutns) {
const off_t writeSlotID=m_synchronizerData.getWriteSlotID();
const off_t writeSlotIDNext=ModuloCounter<off_t>::next(writeSlotID, m_synchronizerData.slotsRingBufferSize());
if (writeSlotID<0)
return true;
if (!m_synchronizerData.wwait(writeSlotID, timeoutns)) {
pthread_yield();
return false;
}
if (dataBlock.isStop()){
m_synchronizerData.setQueueSlot(writeSlotID, static_cast<off_t>(specialid_type::specialid_stop));
} else {
m_synchronizerData.setQueueSlot(writeSlotID, dataBlock.blockID());
}
m_synchronizerData.wacquire(writeSlotIDNext);
m_synchronizerData.lockSynchronizerData();
m_synchronizerData.wrelease(writeSlotID);
m_synchronizerData.setWriteSlotID(writeSlotIDNext);
m_synchronizerData.unlockSynchronizerData();
dataBlock.detach();
dataBlock.setEmpty();
return true;
}
void Producer::bufferReleaseTask() {
off_t releaseDataSlotID;
ModuloCounter<off_t> monitorSlotID(m_synchronizerData.getWriteSlotID(), m_synchronizerData.slotsRingBufferSize());
m_synchronizerData.facquire(monitorSlotID.val());
semPost(m_bufferReleaseMonitorStarted);
while (1) {
m_synchronizerData.fwait(monitorSlotID.val());
if (m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID==static_cast<off_t>(specialid_type::specialid_stop))
break;
if ((releaseDataSlotID = m_synchronizerData.getQueueSlot(monitorSlotID.val()).dataSlotID)>=0) {
m_synchronizerData.setQueueSlot(monitorSlotID.val(), static_cast<off_t>(specialid_type::specialid_empty));
m_memoryManagerData.free(releaseDataSlotID);
}
m_synchronizerData.facquire(monitorSlotID.next());
m_synchronizerData.frelease(monitorSlotID++);
}
m_synchronizerData.frelease(monitorSlotID.val());
}