diff --git a/docs/02-data-exchange.md b/docs/02-data-exchange.md index e30aeaf..5471f8e 100644 --- a/docs/02-data-exchange.md +++ b/docs/02-data-exchange.md @@ -18,14 +18,14 @@ New values can be inserted naturally by using the assignment operator `=` as if ```C++ /* Thread_1.inot */ -counter = 10; /* Store a value into the shared variable in a threadsafe manner. */ +counter.push(10); /* Store a value into the shared variable in a threadsafe manner. */ ``` If the internal queue is full the oldest element is discarded and the latest element is inserted into the queue. Retrieving stored data works also very naturally like it would for any POD data type: ```C++ /* Thread_2.inot */ -Serial.println(counter); /* Retrieves a value from the shared variable in a threadsafe manner. */ +Serial.println(counter.pop()); /* Retrieves a value from the shared variable in a threadsafe manner. */ ``` Should the internal queue be empty when trying to read the latest available value then the thread reading the shared variable will be suspended and the next available thread will be scheduled. Once a new value is stored inside the shared variable the suspended thread resumes operation and consumes the value which has been stored in the internal queue. @@ -55,16 +55,16 @@ DataProducerThread.counter.connectTo(DataConsumerThread_2.counter); Whenever a new value is assigned to a data source, i.e. ```C++ /* DataProducerThread.inot */ -counter = 10; +counter.push(10); ``` data is automatically copied and stored within the internal queues of all connected data sinks, from where it can be retrieved, i.e. ```C++ /* DataConsumerThread_1.inot */ -Serial.println(counter); +Serial.println(counter.pop()); ``` ```C++ /* DataConsumerThread_2.inot */ -Serial.println(counter); +Serial.println(counter.pop()); ``` If a thread tries to read from an empty `Sink` the thread is suspended and the next ready thread is scheduled. When a new value is written to a `Source` and consequently copied to a `Sink` the suspended thread is resumed and continuous execution (i.e. read the data and act upon it). diff --git a/examples/Threading_Basics/Shared_Counter/Consumer.inot b/examples/Threading_Basics/Shared_Counter/Consumer.inot index c03efd9..c6a89fd 100644 --- a/examples/Threading_Basics/Shared_Counter/Consumer.inot +++ b/examples/Threading_Basics/Shared_Counter/Consumer.inot @@ -12,5 +12,5 @@ void loop() * available, then this thread is suspended until new data * is available for reading. */ - Serial.println(counter); + Serial.println(counter.pop()); } diff --git a/examples/Threading_Basics/Shared_Counter/Producer.inot b/examples/Threading_Basics/Shared_Counter/Producer.inot index 7475653..59578d4 100644 --- a/examples/Threading_Basics/Shared_Counter/Producer.inot +++ b/examples/Threading_Basics/Shared_Counter/Producer.inot @@ -10,7 +10,7 @@ void loop() * 'counter'. Internally this is stored within a queue in a FIFO * (First-In/First-Out) manner. */ - counter = i; + counter.push(i); i++; delay(100); } diff --git a/examples/Threading_Basics/Source_Sink_Counter/Consumer.inot b/examples/Threading_Basics/Source_Sink_Counter/Consumer.inot index 5ae30b5..21aafec 100644 --- a/examples/Threading_Basics/Source_Sink_Counter/Consumer.inot +++ b/examples/Threading_Basics/Source_Sink_Counter/Consumer.inot @@ -9,5 +9,5 @@ void setup() void loop() { - Serial.println(counter); + Serial.println(counter.pop()); } diff --git a/examples/Threading_Basics/Source_Sink_Counter/Producer.inot b/examples/Threading_Basics/Source_Sink_Counter/Producer.inot index 3938a1c..c359c40 100644 --- a/examples/Threading_Basics/Source_Sink_Counter/Producer.inot +++ b/examples/Threading_Basics/Source_Sink_Counter/Producer.inot @@ -9,6 +9,6 @@ void setup() void loop() { static int i = 0; - counter = i; + counter.push(i); i++; } diff --git a/examples/Threading_Basics/Source_Sink_LED/Sink_Thread.inot b/examples/Threading_Basics/Source_Sink_LED/Sink_Thread.inot index 369de6b..e4dd975 100644 --- a/examples/Threading_Basics/Source_Sink_LED/Sink_Thread.inot +++ b/examples/Threading_Basics/Source_Sink_LED/Sink_Thread.inot @@ -12,5 +12,5 @@ void loop() * this call will block until new data is inserted from the connected SOURCE. This means * that the pace is dictated by the SOURCE that sends data every 100 ms. */ - digitalWrite(LED_BUILTIN, led); + digitalWrite(LED_BUILTIN, led.pop()); } diff --git a/examples/Threading_Basics/Source_Sink_LED/Source_Thread.inot b/examples/Threading_Basics/Source_Sink_LED/Source_Thread.inot index 78be7f4..dc8f864 100644 --- a/examples/Threading_Basics/Source_Sink_LED/Source_Thread.inot +++ b/examples/Threading_Basics/Source_Sink_LED/Source_Thread.inot @@ -8,8 +8,8 @@ void setup() void loop() { - led = true; + led.push(true); delay(100); - led = false; + led.push(false); delay(100); } diff --git a/src/threading/Shared.hpp b/src/threading/Shared.hpp index 8b35cc9..45f70ae 100644 --- a/src/threading/Shared.hpp +++ b/src/threading/Shared.hpp @@ -40,10 +40,12 @@ class Shared { public: - operator T(); - void operator = (T const & other); + T pop(); + void push(T const & val); inline T peek() const { return _val; } + operator T() [[deprecated("Use 'pop()' instead.")]]; + void operator = (T const & val) [[deprecated("Use 'push()' instead.")]]; private: @@ -57,7 +59,7 @@ class Shared **************************************************************************************/ template -Shared::operator T() +T Shared::pop() { T * val_ptr = _mailbox.try_get_for(rtos::Kernel::wait_for_u32_forever); if (val_ptr) @@ -70,7 +72,7 @@ Shared::operator T() } template -void Shared::operator = (T const & other) +void Shared::push(T const & val) { /* If the mailbox is full we are discarding the * oldest element and then push the new one into @@ -82,14 +84,26 @@ void Shared::operator = (T const & other) _mailbox.free(val_ptr); } - _val = other; + _val = val; T * val_ptr = _mailbox.try_alloc(); if (val_ptr) { - *val_ptr = other; + *val_ptr = val; _mailbox.put(val_ptr); } } +template +Shared::operator T() +{ + return pop(); +} + +template +void Shared::operator = (T const & val) +{ + push(val); +} + #endif /* ARDUINO_THREADS_SHARED_HPP_ */ diff --git a/src/threading/Sink.hpp b/src/threading/Sink.hpp index 539ecb2..eb159b3 100644 --- a/src/threading/Sink.hpp +++ b/src/threading/Sink.hpp @@ -38,8 +38,13 @@ class SinkBase virtual ~SinkBase() { } - virtual operator T() = 0; + virtual T pop() = 0; virtual void inject(T const & value) = 0; + + inline operator T() [[deprecated("Use 'pop()' instead.")]] + { + return pop(); + } }; template @@ -50,7 +55,7 @@ class SinkNonBlocking : public SinkBase SinkNonBlocking() { } virtual ~SinkNonBlocking() { } - virtual operator T() override; + virtual T pop() override; virtual void inject(T const & value) override; @@ -69,7 +74,7 @@ class SinkBlocking : public SinkBase SinkBlocking(size_t const size); virtual ~SinkBlocking() { } - virtual operator T() override; + virtual T pop() override; virtual void inject(T const & value) override; @@ -87,7 +92,7 @@ class SinkBlocking : public SinkBase **************************************************************************************/ template -SinkNonBlocking::operator T() +T SinkNonBlocking::pop() { _mutex.lock(); return _data; @@ -114,7 +119,7 @@ SinkBlocking::SinkBlocking(size_t const size) { } template -SinkBlocking::operator T() +T SinkBlocking::pop() { _mutex.lock(); while (_data.isEmpty()) diff --git a/src/threading/Source.hpp b/src/threading/Source.hpp index d75ac8c..0f64af5 100644 --- a/src/threading/Source.hpp +++ b/src/threading/Source.hpp @@ -43,7 +43,9 @@ class Source public: void connectTo(SinkBase & sink); - void operator = (T const & other); + void push(T const & val); + + void operator = (T const & val) [[deprecated("Use 'push()' instead.")]]; private: std::list *> _sink_list; @@ -60,14 +62,20 @@ void Source::connectTo(SinkBase & sink) } template -void Source::operator = (T const & value) +void Source::push(T const & val) { std::for_each(std::begin(_sink_list), std::end (_sink_list), - [value](SinkBase * sink) + [val](SinkBase * sink) { - sink->inject(value); + sink->inject(val); }); } +template +void Source::operator = (T const & val) +{ + push(val); +} + #endif /* ARDUINO_THREADS_SOURCE_HPP_ */