From ad3e3c44eebaf70cf53babb04e19812317c45341 Mon Sep 17 00:00:00 2001 From: dario Date: Thu, 18 Apr 2024 21:35:27 +0200 Subject: [PATCH] Updated implementation of barrier pattern --- include/sta/rtos/patterns/barrier.hpp | 59 ++++++++++++++- include/sta/rtos/thread.hpp | 7 ++ src/patterns/barrier.cpp | 101 ++++++++++++++++++++++---- src/thread.cpp | 5 ++ 4 files changed, 152 insertions(+), 20 deletions(-) diff --git a/include/sta/rtos/patterns/barrier.hpp b/include/sta/rtos/patterns/barrier.hpp index cc31a16..e7d7e6f 100644 --- a/include/sta/rtos/patterns/barrier.hpp +++ b/include/sta/rtos/patterns/barrier.hpp @@ -11,20 +11,71 @@ #include #include +#include + namespace sta { class Barrier { public: - Barrier(const char* name, uint8_t n_threads); + /** + * @brief Construct a new Barrier object + * + * @param name The name of the mutex used for this barrier. + * @param waitTime The minimum time that needs to pass until the barrier can open again. + */ + Barrier(const char* name, uint32_t waitTime = 0); + /** + * @brief Adds the currently running thread to the threads the barrier waits for. + * + */ + void subscribe(); + + /** + * + * @return uint8_t Returns the number of threads currently subscribed to this barrier + */ + uint8_t getNumSubscribedThreads(); + + /** + * + * @return uint8_t Returns the number of threads that have entered the barrier and are waiting now. + */ + uint8_t getNumEntered(); + + /** + * + * @return uint8_t Returns the number of threads that have left the barrier so far. + */ + uint8_t getNumLeft(); + + void setMinimumWaitDuration(uint32_t duration); + + /** + * @brief Wait for the barrier to open again. Sets this state into waiting state. + * + */ void wait(); + + /** + * @brief Method for an external thread to wait for the barrier to open without subscribing to it. + * + * @note This means that the barrier doesn't wait for this thread. + * + */ + void externalWait(); private: RtosMutex mutex_; RtosEvent flag_; - const uint8_t n_threads_; - uint8_t n_entered_; - uint8_t n_left_; + + uint32_t waitTime_; + uint32_t lastTime_; + + std::set threads_; + uint8_t nThreads_; + uint8_t nEntered_; + uint8_t nLeft_; }; } // namespace sta diff --git a/include/sta/rtos/thread.hpp b/include/sta/rtos/thread.hpp index 981b2da..23dda93 100644 --- a/include/sta/rtos/thread.hpp +++ b/include/sta/rtos/thread.hpp @@ -82,6 +82,13 @@ namespace sta */ RtosThread(const char* name, osPriority_t prio, uint32_t stack_size = 0, uint32_t cb_size = 0); + /** + * @brief Static method for obtaining the ID of the currently running RtosThread. + * + * @return osThreadId_t The ID of the currently running Rtos-Thread. + */ + static osThreadId_t getCurrentlyRunningThreadID(); + /** * @brief Get the currently running instance. * diff --git a/src/patterns/barrier.cpp b/src/patterns/barrier.cpp index 9bbb606..ca4ffde 100644 --- a/src/patterns/barrier.cpp +++ b/src/patterns/barrier.cpp @@ -6,20 +6,72 @@ */ #include +#include + +#include #include namespace sta { - Barrier::Barrier(const char* name, uint8_t n_threads) + Barrier::Barrier(const char * name, uint32_t waitTime /* = 0 */) : mutex_{name}, flag_{}, - n_threads_{n_threads}, - n_entered_{0}, - n_left_{n_threads} + waitTime_{waitTime}, + lastTime_{now()}, + threads_{}, + nThreads_{0}, + nEntered_{0}, + nLeft_{0} { STA_ASSERT(name != nullptr); - STA_ASSERT(n_threads >= 1); + } + + void Barrier::subscribe() + { + mutex_.acquire(); + threads_.insert(RtosThread::getCurrentlyRunningThreadID()); + nThreads_ += 1; + nLeft_ += 1; + mutex_.release(); + } + + uint8_t Barrier::getNumSubscribedThreads() + { + mutex_.acquire(); + uint8_t rslt = nThreads_; + mutex_.release(); + + return rslt; + } + + uint8_t Barrier::getNumEntered() + { + mutex_.acquire(); + uint8_t rslt = nEntered_; + mutex_.release(); + + return rslt; + } + + /** + * + * @return uint8_t Returns the number of threads that have left the barrier so far. + */ + uint8_t Barrier::getNumLeft() + { + mutex_.acquire(); + uint8_t rslt = nLeft_; + mutex_.release(); + + return rslt; + } + + void Barrier::setMinimumWaitDuration(uint32_t duration) + { + mutex_.acquire(); + waitTime_ = duration; + mutex_.release(); } void Barrier::wait() @@ -29,9 +81,13 @@ namespace sta */ mutex_.acquire(); - if (n_entered_ == 0) + // Make sure the current thread is actually subscribed to this barrier. + STA_ASSERT(threads_.find(RtosThread::getCurrentlyRunningThreadID()) != threads_.end()); + + // Is this the first thread entering the barrier? + if (nEntered_ == 0) { - if (n_left_ == n_threads_) + if (nLeft_ == nThreads_) { // First thread to arrive clears the flag. flag_.clear(); @@ -43,7 +99,7 @@ namespace sta mutex_.release(); // Busy waiting until the last thread leaves. - while (n_left_ != n_threads_) + while (nLeft_ != nThreads_) { osThreadYield(); } @@ -56,28 +112,41 @@ namespace sta } // Register this thread entering the barrier. - ++n_entered_; - mutex_.release(); + ++nEntered_; - if (n_entered_ == n_threads_) + if (nEntered_ == nThreads_) { - // If all threads are waiting in the barrier, set the flag allowing threads to leave. - n_entered_ = 0; - n_left_ = 1; + if (now() - lastTime_ < waitTime_) + { + // Let the thread sleep for the remaining time. + osDelay(waitTime_ - (now() - lastTime_)); + } + // If all threads are waiting in the barrier, set the flag allowing threads to leave. + nEntered_ = 0; + nLeft_ = 1; + + mutex_.release(); flag_.set(); } else { + mutex_.release(); + // Wait in the barrier until the flag is set. flag_.wait(); - // Register this thread leaving the barrier. + // Register that this thread left the barrier. mutex_.acquire(); - n_left_++; + nLeft_++; mutex_.release(); } } + + void Barrier::externalWait() + { + flag_.wait(); + } } // namespace sta diff --git a/src/thread.cpp b/src/thread.cpp index 02a7abe..c4040c3 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -12,6 +12,11 @@ namespace sta attribs_{ .name = name, .cb_size = cb_size, .stack_size = stack_size, .priority = prio } {} + osThreadId_t RtosThread::getCurrentlyRunningThreadID() + { + return osThreadGetId(); + } + osThreadId_t RtosThread::getInstance() { return instance_;