Refactored template class for synchronised resources

This commit is contained in:
Nav
2023-06-02 00:16:58 +01:00
parent 10611d3ad3
commit 80cf6930cc
9 changed files with 146 additions and 135 deletions

View File

@@ -7,8 +7,7 @@ namespace Bloom
using namespace Bloom::Events; using namespace Bloom::Events;
std::set<Events::EventType> EventListener::getRegisteredEventTypes() { std::set<Events::EventType> EventListener::getRegisteredEventTypes() {
const auto lock = this->registeredEventTypes.acquireLock(); return *(this->registeredEventTypes.accessor());
return this->registeredEventTypes.getValue();
} }
void EventListener::registerEvent(SharedGenericEventPointer event) { void EventListener::registerEvent(SharedGenericEventPointer event) {
@@ -17,8 +16,8 @@ namespace Bloom
+ this->name + this->name
); );
const auto queueLock = this->eventQueueByEventType.acquireLock(); auto eventQueueByTypeAccessor = this->eventQueueByEventType.accessor();
auto& eventQueueByType = this->eventQueueByEventType.getValue(); auto& eventQueueByType = *(eventQueueByTypeAccessor);
eventQueueByType[event->getType()].push(std::move(event)); eventQueueByType[event->getType()].push(std::move(event));
this->eventQueueByEventTypeCV.notify_all(); this->eventQueueByEventTypeCV.notify_all();
@@ -30,8 +29,9 @@ namespace Bloom
void EventListener::waitAndDispatch(int msTimeout) { void EventListener::waitAndDispatch(int msTimeout) {
{ {
auto queueLock = this->eventQueueByEventType.acquireLock(); auto queueLock = this->eventQueueByEventType.lock();
const auto& eventQueueByType = this->eventQueueByEventType.getValue(); const auto& eventQueueByType = this->eventQueueByEventType.unsafeReference();
const auto registeredEventTypes = this->getRegisteredEventTypes(); const auto registeredEventTypes = this->getRegisteredEventTypes();
std::optional<SharedGenericEventPointer> event; std::optional<SharedGenericEventPointer> event;
@@ -62,8 +62,12 @@ namespace Bloom
auto callbacks = std::vector<std::function<void(const Events::Event&)>>(); auto callbacks = std::vector<std::function<void(const Events::Event&)>>();
{ {
const auto mappingLock = this->eventTypeToCallbacksMapping.acquireLock(); const auto callbackMappingAccessor = this->eventTypeToCallbacksMapping.accessor();
callbacks = this->eventTypeToCallbacksMapping.getValue().find(event->getType())->second;
const auto callbacksIt = callbackMappingAccessor->find(event->getType());
if (callbacksIt != callbackMappingAccessor->end()) {
callbacks = callbacksIt->second;
}
} }
for (auto& callback : callbacks) { for (auto& callback : callbacks) {
@@ -80,11 +84,10 @@ namespace Bloom
} }
std::vector<SharedGenericEventPointer> EventListener::getEvents() { std::vector<SharedGenericEventPointer> EventListener::getEvents() {
const auto queueLock = this->eventQueueByEventType.acquireLock(); auto eventQueueByType = this->eventQueueByEventType.accessor();
auto& eventQueueByType = this->eventQueueByEventType.getValue();
std::vector<SharedGenericEventPointer> output; std::vector<SharedGenericEventPointer> output;
for (auto& eventQueue: eventQueueByType) { for (auto& eventQueue: *eventQueueByType) {
while (!eventQueue.second.empty()) { while (!eventQueue.second.empty()) {
output.push_back(std::move(eventQueue.second.front())); output.push_back(std::move(eventQueue.second.front()));
eventQueue.second.pop(); eventQueue.second.pop();
@@ -103,7 +106,6 @@ namespace Bloom
} }
void EventListener::clearAllCallbacks() { void EventListener::clearAllCallbacks() {
const auto lock = this->eventTypeToCallbacksMapping.acquireLock(); this->eventTypeToCallbacksMapping.accessor()->clear();
this->eventTypeToCallbacksMapping.getValue().clear();
} }
} }

View File

@@ -14,7 +14,7 @@
#include <set> #include <set>
#include "src/EventManager/Events/Events.hpp" #include "src/EventManager/Events/Events.hpp"
#include "src/Helpers/SyncSafe.hpp" #include "src/Helpers/Synchronised.hpp"
#include "src/Helpers/NotifierInterface.hpp" #include "src/Helpers/NotifierInterface.hpp"
namespace Bloom namespace Bloom
@@ -55,13 +55,8 @@ namespace Bloom
*/ */
std::set<Events::EventType> getRegisteredEventTypes(); std::set<Events::EventType> getRegisteredEventTypes();
template <class EventType>
bool isEventTypeRegistered() {
return this->registeredEventTypes.getValue().contains(EventType::type);
}
bool isEventTypeRegistered(Events::EventType eventType) { bool isEventTypeRegistered(Events::EventType eventType) {
return this->registeredEventTypes.getValue().contains(eventType); return this->registeredEventTypes.accessor()->contains(eventType);
}; };
/** /**
@@ -75,14 +70,12 @@ namespace Bloom
*/ */
template<class EventType> template<class EventType>
void registerEventType() { void registerEventType() {
const auto registeredEventTypesLock = this->registeredEventTypes.acquireLock(); this->registeredEventTypes.accessor()->insert(EventType::type);
this->registeredEventTypes.getValue().insert(EventType::type);
} }
template<class EventType> template<class EventType>
void deRegisterEventType() { void deRegisterEventType() {
const auto registeredEventTypesLock = this->registeredEventTypes.acquireLock(); this->registeredEventTypes.accessor()->erase(EventType::type);
this->registeredEventTypes.getValue().erase(EventType::type);
} }
/** /**
@@ -117,8 +110,8 @@ namespace Bloom
} }
; ;
const auto mappingLock = this->eventTypeToCallbacksMapping.acquireLock(); auto mappingAccessor = this->eventTypeToCallbacksMapping.accessor();
auto& mapping = this->eventTypeToCallbacksMapping.getValue(); auto& mapping = *(mappingAccessor);
mapping[EventType::type].push_back(parentCallback); mapping[EventType::type].push_back(parentCallback);
this->template registerEventType<EventType>(); this->template registerEventType<EventType>();
@@ -137,24 +130,19 @@ namespace Bloom
); );
{ {
const auto mappingLock = this->eventTypeToCallbacksMapping.acquireLock(); auto mappingAccessor = this->eventTypeToCallbacksMapping.accessor();
auto& mapping = this->eventTypeToCallbacksMapping.getValue(); auto& mapping = *(mappingAccessor);
if (mapping.contains(EventType::type)) { if (mapping.contains(EventType::type)) {
mapping.at(EventType::type).clear(); mapping.at(EventType::type).clear();
} }
} }
{ this->registeredEventTypes.accessor()->erase(EventType::type);
auto registeredEventTypesLock = this->registeredEventTypes.acquireLock();
this->registeredEventTypes.getValue().erase(EventType::type);
}
const auto queueLock = this->eventQueueByEventType.acquireLock(); auto eventQueueByType = this->eventQueueByEventType.accessor();
auto& eventQueueByType = this->eventQueueByEventType.getValue(); if (eventQueueByType->contains(EventType::type)) {
eventQueueByType->erase(EventType::type);
if (eventQueueByType.contains(EventType::type)) {
eventQueueByType.erase(EventType::type);
} }
} }
@@ -210,8 +198,8 @@ namespace Bloom
ReturnType output = std::nullopt; ReturnType output = std::nullopt;
auto queueLock = this->eventQueueByEventType.acquireLock(); auto queueLock = this->eventQueueByEventType.lock();
auto& eventQueueByType = this->eventQueueByEventType.getValue(); auto& eventQueueByType = this->eventQueueByEventType.unsafeReference();
auto eventTypes = std::set<Events::EventType>({EventTypeA::type}); auto eventTypes = std::set<Events::EventType>({EventTypeA::type});
auto eventTypesToDeRegister = std::set<Events::EventType>(); auto eventTypesToDeRegister = std::set<Events::EventType>();
@@ -233,12 +221,11 @@ namespace Bloom
} }
{ {
auto registeredEventTypesLock = this->registeredEventTypes.acquireLock(); auto registeredEventTypes = this->registeredEventTypes.accessor();
auto& registeredEventTypes = this->registeredEventTypes.getValue();
for (const auto& eventType : eventTypes) { for (const auto& eventType : eventTypes) {
if (!registeredEventTypes.contains(eventType)) { if (!registeredEventTypes->contains(eventType)) {
registeredEventTypes.insert(eventType); registeredEventTypes->insert(eventType);
eventTypesToDeRegister.insert(eventType); eventTypesToDeRegister.insert(eventType);
} }
} }
@@ -270,11 +257,10 @@ namespace Bloom
} }
if (!eventTypesToDeRegister.empty()) { if (!eventTypesToDeRegister.empty()) {
auto registeredEventTypesLock = this->registeredEventTypes.acquireLock(); auto registeredEventTypes = this->registeredEventTypes.accessor();
auto& registeredEventTypes = this->registeredEventTypes.getValue();
for (const auto& eventType : eventTypesToDeRegister) { for (const auto& eventType : eventTypesToDeRegister) {
registeredEventTypes.erase(eventType); registeredEventTypes->erase(eventType);
} }
} }
@@ -346,7 +332,7 @@ namespace Bloom
* Events are grouped by event type, and removed from their queue just *before* the dispatching to * Events are grouped by event type, and removed from their queue just *before* the dispatching to
* registered handlers begins. * registered handlers begins.
*/ */
SyncSafe<std::map<Events::EventType, std::queue<Events::SharedGenericEventPointer>>> eventQueueByEventType; Synchronised<std::map<Events::EventType, std::queue<Events::SharedGenericEventPointer>>> eventQueueByEventType;
std::condition_variable eventQueueByEventTypeCV; std::condition_variable eventQueueByEventTypeCV;
/** /**
@@ -357,8 +343,8 @@ namespace Bloom
* we perform a downcast before invoking the callback. See EventListener::registerCallbackForEventType() * we perform a downcast before invoking the callback. See EventListener::registerCallbackForEventType()
* for more) * for more)
*/ */
SyncSafe<std::map<Events::EventType, std::vector<std::function<void(const Events::Event&)>>>> eventTypeToCallbacksMapping; Synchronised<std::map<Events::EventType, std::vector<std::function<void(const Events::Event&)>>>> eventTypeToCallbacksMapping;
SyncSafe<std::set<Events::EventType>> registeredEventTypes; Synchronised<std::set<Events::EventType>> registeredEventTypes;
NotifierInterface* interruptEventNotifier = nullptr; NotifierInterface* interruptEventNotifier = nullptr;

View File

@@ -1,42 +0,0 @@
#pragma once
#include <mutex>
namespace Bloom
{
/**
* Template for synchronization safe types.
*
* Just a convenient template that allows us to create thread safe types without having to write
* the bloat of mutexes, unique_locks, etc etc.
*
* @tparam Type
*/
template<typename Type>
class SyncSafe
{
public:
SyncSafe() = default;
explicit SyncSafe(Type value)
: value(value)
{}
void setValue(const Type& value) {
auto lock = std::unique_lock(this->mutex);
this->value = value;
}
Type& getValue() {
return this->value;
}
std::unique_lock<std::mutex> acquireLock() {
return std::unique_lock(this->mutex);
}
private:
Type value;
std::mutex mutex;
};
}

View File

@@ -0,0 +1,78 @@
#pragma once
#include <mutex>
#include <utility>
namespace Bloom
{
/**
* Wrapper for synchronised access to a resource.
*
* @tparam Type
*/
template<typename Type>
class Synchronised
{
public:
class Accessor
{
public:
constexpr Accessor(std::mutex& mutex, Type& value)
: lock(std::unique_lock(mutex))
, value(value)
{}
constexpr Type* operator -> () noexcept {
return &(this->value);
}
constexpr const Type* operator -> () const noexcept {
return &(this->value);
}
constexpr Type& operator * () noexcept {
return this->value;
}
constexpr const Type& operator * () const noexcept {
return this->value;
}
private:
std::unique_lock<std::mutex> lock;
Type& value;
};
Synchronised() = default;
explicit Synchronised(Type value)
: value(std::move(value))
{}
Accessor accessor() {
return Accessor(this->mutex, this->value);
}
/**
* Don't use this unless you already hold a raw (not managed by an Accessor) lock to the contained value.
*
* This should only be used in instances where you need to hold a raw lock, like in the `stop_waiting`
* predicate function for a call to std::condition_variable::wait().
*
* In all other instances, you should use Synchronised::accessor().
*
* @return
*/
Type& unsafeReference() {
return this->value;
}
std::unique_lock<std::mutex> lock() {
return std::unique_lock(this->mutex);
}
private:
Type value;
std::mutex mutex;
};
}

View File

@@ -36,26 +36,21 @@ namespace Bloom
void InsightWorker::queueTask(const QSharedPointer<InsightWorkerTask>& task) { void InsightWorker::queueTask(const QSharedPointer<InsightWorkerTask>& task) {
task->moveToThread(nullptr); task->moveToThread(nullptr);
{ InsightWorker::queuedTasksById.accessor()->emplace(task->id, task);
const auto taskQueueLock = InsightWorker::queuedTasksById.acquireLock();
InsightWorker::queuedTasksById.getValue().emplace(task->id, task);
}
emit InsightSignals::instance()->taskQueued(task); emit InsightSignals::instance()->taskQueued(task);
} }
void InsightWorker::executeTasks() { void InsightWorker::executeTasks() {
static const auto getQueuedTask = [] () -> std::optional<QSharedPointer<InsightWorkerTask>> { static const auto getQueuedTask = [] () -> std::optional<QSharedPointer<InsightWorkerTask>> {
const auto taskQueueLock = InsightWorker::queuedTasksById.acquireLock(); auto queuedTasks = InsightWorker::queuedTasksById.accessor();
auto& queuedTasks = InsightWorker::queuedTasksById.getValue();
if (!queuedTasks.empty()) { if (!queuedTasks->empty()) {
const auto taskGroupsLock = InsightWorker::taskGroupsInExecution.acquireLock(); auto taskGroupsInExecution = InsightWorker::taskGroupsInExecution.accessor();
auto& taskGroupsInExecution = InsightWorker::taskGroupsInExecution.getValue();
const auto canExecuteTask = [&taskGroupsInExecution] (const QSharedPointer<InsightWorkerTask>& task) { const auto canExecuteTask = [&taskGroupsInExecution] (const QSharedPointer<InsightWorkerTask>& task) {
for (const auto taskGroup : task->taskGroups()) { for (const auto taskGroup : task->taskGroups()) {
if (taskGroupsInExecution.contains(taskGroup)) { if (taskGroupsInExecution->contains(taskGroup)) {
return false; return false;
} }
} }
@@ -63,11 +58,11 @@ namespace Bloom
return true; return true;
}; };
for (auto [queuedTaskId, task] : queuedTasks) { for (auto [queuedTaskId, task] : *queuedTasks) {
if (canExecuteTask(task)) { if (canExecuteTask(task)) {
const auto taskGroups = task->taskGroups(); const auto taskGroups = task->taskGroups();
taskGroupsInExecution.insert(taskGroups.begin(), taskGroups.end()); taskGroupsInExecution->insert(taskGroups.begin(), taskGroups.end());
queuedTasks.erase(queuedTaskId); queuedTasks->erase(queuedTaskId);
return task; return task;
} }
} }
@@ -84,11 +79,10 @@ namespace Bloom
task->execute(this->targetControllerService); task->execute(this->targetControllerService);
{ {
const auto taskGroupsLock = InsightWorker::taskGroupsInExecution.acquireLock(); auto taskGroupsInExecution = InsightWorker::taskGroupsInExecution.accessor();
auto& taskGroupsInExecution = InsightWorker::taskGroupsInExecution.getValue();
for (const auto& taskGroup : task->taskGroups()) { for (const auto& taskGroup : task->taskGroups()) {
taskGroupsInExecution.erase(taskGroup); taskGroupsInExecution->erase(taskGroup);
} }
} }

View File

@@ -9,7 +9,7 @@
#include "Tasks/InsightWorkerTask.hpp" #include "Tasks/InsightWorkerTask.hpp"
#include "src/Helpers/SyncSafe.hpp" #include "src/Helpers/Synchronised.hpp"
#include "src/Services/TargetControllerService.hpp" #include "src/Services/TargetControllerService.hpp"
namespace Bloom namespace Bloom
@@ -36,8 +36,8 @@ namespace Bloom
private: private:
static inline std::atomic<std::uint8_t> lastWorkerId = 0; static inline std::atomic<std::uint8_t> lastWorkerId = 0;
static inline SyncSafe<std::map<InsightWorkerTask::IdType, QSharedPointer<InsightWorkerTask>>> queuedTasksById = {}; static inline Synchronised<std::map<InsightWorkerTask::IdType, QSharedPointer<InsightWorkerTask>>> queuedTasksById = {};
static inline SyncSafe<TaskGroups> taskGroupsInExecution = {}; static inline Synchronised<TaskGroups> taskGroupsInExecution = {};
Services::TargetControllerService targetControllerService = Services::TargetControllerService(); Services::TargetControllerService targetControllerService = Services::TargetControllerService();

View File

@@ -4,7 +4,7 @@
#include "src/Helpers/Thread.hpp" #include "src/Helpers/Thread.hpp"
#include "src/EventManager/EventManager.hpp" #include "src/EventManager/EventManager.hpp"
#include "src/Helpers/SyncSafe.hpp" #include "src/Helpers/Synchronised.hpp"
namespace Bloom namespace Bloom
{ {

View File

@@ -94,14 +94,12 @@ namespace Bloom::TargetController
if (atomicSessionId.has_value()) { if (atomicSessionId.has_value()) {
// This command is part of an atomic session - put it in the dedicated queue // This command is part of an atomic session - put it in the dedicated queue
const auto commandQueueLock = TargetControllerComponent::atomicSessionCommandQueue.acquireLock(); TargetControllerComponent::atomicSessionCommandQueue.accessor()->push(std::move(command));
TargetControllerComponent::atomicSessionCommandQueue.getValue().push(std::move(command));
TargetControllerComponent::notifier.notify(); TargetControllerComponent::notifier.notify();
return; return;
} }
const auto commandQueueLock = TargetControllerComponent::commandQueue.acquireLock(); TargetControllerComponent::commandQueue.accessor()->push(std::move(command));
TargetControllerComponent::commandQueue.getValue().push(std::move(command));
TargetControllerComponent::notifier.notify(); TargetControllerComponent::notifier.notify();
} }
@@ -112,7 +110,8 @@ namespace Bloom::TargetController
auto response = std::unique_ptr<Response>(nullptr); auto response = std::unique_ptr<Response>(nullptr);
const auto predicate = [commandId, &response] { const auto predicate = [commandId, &response] {
auto& responsesByCommandId = TargetControllerComponent::responsesByCommandId.getValue(); // We will already hold the lock here, so we can use Synchronised::unsafeReference() here.
auto& responsesByCommandId = TargetControllerComponent::responsesByCommandId.unsafeReference();
auto responseIt = responsesByCommandId.find(commandId); auto responseIt = responsesByCommandId.find(commandId);
if (responseIt != responsesByCommandId.end()) { if (responseIt != responsesByCommandId.end()) {
@@ -125,7 +124,7 @@ namespace Bloom::TargetController
return false; return false;
}; };
auto responsesByCommandIdLock = TargetControllerComponent::responsesByCommandId.acquireLock(); auto responsesByCommandIdLock = TargetControllerComponent::responsesByCommandId.lock();
if (timeout.has_value()) { if (timeout.has_value()) {
TargetControllerComponent::responsesByCommandIdCv.wait_for( TargetControllerComponent::responsesByCommandIdCv.wait_for(
@@ -395,14 +394,11 @@ namespace Bloom::TargetController
void TargetControllerComponent::processQueuedCommands() { void TargetControllerComponent::processQueuedCommands() {
auto commands = std::queue<std::unique_ptr<Command>>(); auto commands = std::queue<std::unique_ptr<Command>>();
if (this->activeAtomicSession.has_value()) { commands.swap(
const auto queueLock = TargetControllerComponent::atomicSessionCommandQueue.acquireLock(); this->activeAtomicSession.has_value()
commands.swap(TargetControllerComponent::atomicSessionCommandQueue.getValue()); ? *(TargetControllerComponent::atomicSessionCommandQueue.accessor())
: *(TargetControllerComponent::commandQueue.accessor())
} else { );
const auto queueLock = TargetControllerComponent::commandQueue.acquireLock();
commands.swap(TargetControllerComponent::commandQueue.getValue());
}
while (!commands.empty()) { while (!commands.empty()) {
const auto command = std::move(commands.front()); const auto command = std::move(commands.front());
@@ -455,10 +451,7 @@ namespace Bloom::TargetController
CommandIdType commandId, CommandIdType commandId,
std::unique_ptr<Response> response std::unique_ptr<Response> response
) { ) {
const auto responseMappingLock = TargetControllerComponent::responsesByCommandId.acquireLock(); TargetControllerComponent::responsesByCommandId.accessor()->emplace(commandId, std::move(response));
TargetControllerComponent::responsesByCommandId.getValue().insert(
std::pair(commandId, std::move(response))
);
TargetControllerComponent::responsesByCommandIdCv.notify_all(); TargetControllerComponent::responsesByCommandIdCv.notify_all();
} }
@@ -550,9 +543,9 @@ namespace Bloom::TargetController
} }
{ {
const auto commandQueueLock = TargetControllerComponent::atomicSessionCommandQueue.acquireLock(); auto commandQueue = TargetControllerComponent::atomicSessionCommandQueue.accessor();
auto empty = std::queue<std::unique_ptr<Commands::Command>>(); auto empty = std::queue<std::unique_ptr<Commands::Command>>();
TargetControllerComponent::atomicSessionCommandQueue.getValue().swap(empty); commandQueue->swap(empty);
} }
this->activeAtomicSession.reset(); this->activeAtomicSession.reset();

View File

@@ -13,7 +13,7 @@
#include <QJsonArray> #include <QJsonArray>
#include "src/Helpers/Thread.hpp" #include "src/Helpers/Thread.hpp"
#include "src/Helpers/SyncSafe.hpp" #include "src/Helpers/Synchronised.hpp"
#include "src/Helpers/ConditionVariableNotifier.hpp" #include "src/Helpers/ConditionVariableNotifier.hpp"
#include "TargetControllerState.hpp" #include "TargetControllerState.hpp"
@@ -103,7 +103,7 @@ namespace Bloom::TargetController
); );
private: private:
static inline SyncSafe<std::queue<std::unique_ptr<Commands::Command>>> commandQueue; static inline Synchronised<std::queue<std::unique_ptr<Commands::Command>>> commandQueue;
/** /**
* We have a dedicated queue for atomic sessions. * We have a dedicated queue for atomic sessions.
@@ -111,9 +111,9 @@ namespace Bloom::TargetController
* During an atomic session, all commands for the session are placed into this dedicated queue. * During an atomic session, all commands for the session are placed into this dedicated queue.
* The TargetController will only serve commands from this dedicated queue, until the atomic session ends. * The TargetController will only serve commands from this dedicated queue, until the atomic session ends.
*/ */
static inline SyncSafe<std::queue<std::unique_ptr<Commands::Command>>> atomicSessionCommandQueue; static inline Synchronised<std::queue<std::unique_ptr<Commands::Command>>> atomicSessionCommandQueue;
static inline SyncSafe< static inline Synchronised<
std::map<Commands::CommandIdType, std::unique_ptr<Responses::Response>> std::map<Commands::CommandIdType, std::unique_ptr<Responses::Response>>
> responsesByCommandId; > responsesByCommandId;