Skip to content

Shared/Sink/Source: Provide getters/setter in addition to operator overloading. #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 23, 2022
10 changes: 5 additions & 5 deletions docs/02-data-exchange.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ New values can be inserted naturally by using the assignment operator `=` as if

```C++
/* Thread_1.inot */
counter = 10; /* Store a value into the shared variable in a threadsafe manner. */
counter.push(10); /* Store a value into the shared variable in a threadsafe manner. */
```
If the internal queue is full the oldest element is discarded and the latest element is inserted into the queue.

Retrieving stored data works also very naturally like it would for any POD data type:
```C++
/* Thread_2.inot */
Serial.println(counter); /* Retrieves a value from the shared variable in a threadsafe manner. */
Serial.println(counter.pop()); /* Retrieves a value from the shared variable in a threadsafe manner. */
```

Should the internal queue be empty when trying to read the latest available value then the thread reading the shared variable will be suspended and the next available thread will be scheduled. Once a new value is stored inside the shared variable the suspended thread resumes operation and consumes the value which has been stored in the internal queue.
Expand Down Expand Up @@ -55,16 +55,16 @@ DataProducerThread.counter.connectTo(DataConsumerThread_2.counter);
Whenever a new value is assigned to a data source, i.e.
```C++
/* DataProducerThread.inot */
counter = 10;
counter.push(10);
```
data is automatically copied and stored within the internal queues of all connected data sinks, from where it can be retrieved, i.e.
```C++
/* DataConsumerThread_1.inot */
Serial.println(counter);
Serial.println(counter.pop());
```
```C++
/* DataConsumerThread_2.inot */
Serial.println(counter);
Serial.println(counter.pop());
```
If a thread tries to read from an empty `Sink` the thread is suspended and the next ready thread is scheduled. When a new value is written to a `Source` and consequently copied to a `Sink` the suspended thread is resumed and continuous execution (i.e. read the data and act upon it).

Expand Down
2 changes: 1 addition & 1 deletion examples/Threading_Basics/Shared_Counter/Consumer.inot
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ void loop()
* available, then this thread is suspended until new data
* is available for reading.
*/
Serial.println(counter);
Serial.println(counter.pop());
}
2 changes: 1 addition & 1 deletion examples/Threading_Basics/Shared_Counter/Producer.inot
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ void loop()
* 'counter'. Internally this is stored within a queue in a FIFO
* (First-In/First-Out) manner.
*/
counter = i;
counter.push(i);
i++;
delay(100);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ void setup()

void loop()
{
Serial.println(counter);
Serial.println(counter.pop());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ void setup()
void loop()
{
static int i = 0;
counter = i;
counter.push(i);
i++;
}
2 changes: 1 addition & 1 deletion examples/Threading_Basics/Source_Sink_LED/Sink_Thread.inot
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ void loop()
* this call will block until new data is inserted from the connected SOURCE. This means
* that the pace is dictated by the SOURCE that sends data every 100 ms.
*/
digitalWrite(LED_BUILTIN, led);
digitalWrite(LED_BUILTIN, led.pop());
}
4 changes: 2 additions & 2 deletions examples/Threading_Basics/Source_Sink_LED/Source_Thread.inot
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ void setup()

void loop()
{
led = true;
led.push(true);
delay(100);
led = false;
led.push(false);
delay(100);
}
26 changes: 20 additions & 6 deletions src/threading/Shared.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ class Shared
{
public:

operator T();
void operator = (T const & other);
T pop();
void push(T const & val);
inline T peek() const { return _val; }

operator T() [[deprecated("Use 'pop()' instead.")]];
void operator = (T const & val) [[deprecated("Use 'push()' instead.")]];

private:

Expand All @@ -57,7 +59,7 @@ class Shared
**************************************************************************************/

template<class T, size_t QUEUE_SIZE>
Shared<T,QUEUE_SIZE>::operator T()
T Shared<T,QUEUE_SIZE>::pop()
{
T * val_ptr = _mailbox.try_get_for(rtos::Kernel::wait_for_u32_forever);
if (val_ptr)
Expand All @@ -70,7 +72,7 @@ Shared<T,QUEUE_SIZE>::operator T()
}

template<class T, size_t QUEUE_SIZE>
void Shared<T,QUEUE_SIZE>::operator = (T const & other)
void Shared<T,QUEUE_SIZE>::push(T const & val)
{
/* If the mailbox is full we are discarding the
* oldest element and then push the new one into
Expand All @@ -82,14 +84,26 @@ void Shared<T,QUEUE_SIZE>::operator = (T const & other)
_mailbox.free(val_ptr);
}

_val = other;
_val = val;

T * val_ptr = _mailbox.try_alloc();
if (val_ptr)
{
*val_ptr = other;
*val_ptr = val;
_mailbox.put(val_ptr);
}
}

template<class T, size_t QUEUE_SIZE>
Shared<T,QUEUE_SIZE>::operator T()
{
return pop();
}

template<class T, size_t QUEUE_SIZE>
void Shared<T,QUEUE_SIZE>::operator = (T const & val)
{
push(val);
}

#endif /* ARDUINO_THREADS_SHARED_HPP_ */
15 changes: 10 additions & 5 deletions src/threading/Sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ class SinkBase

virtual ~SinkBase() { }

virtual operator T() = 0;
virtual T pop() = 0;
virtual void inject(T const & value) = 0;

inline operator T() [[deprecated("Use 'pop()' instead.")]]
{
return pop();
}
};

template<typename T>
Expand All @@ -50,7 +55,7 @@ class SinkNonBlocking : public SinkBase<T>
SinkNonBlocking() { }
virtual ~SinkNonBlocking() { }

virtual operator T() override;
virtual T pop() override;
virtual void inject(T const & value) override;


Expand All @@ -69,7 +74,7 @@ class SinkBlocking : public SinkBase<T>
SinkBlocking(size_t const size);
virtual ~SinkBlocking() { }

virtual operator T() override;
virtual T pop() override;
virtual void inject(T const & value) override;


Expand All @@ -87,7 +92,7 @@ class SinkBlocking : public SinkBase<T>
**************************************************************************************/

template<typename T>
SinkNonBlocking<T>::operator T()
T SinkNonBlocking<T>::pop()
{
_mutex.lock();
return _data;
Expand All @@ -114,7 +119,7 @@ SinkBlocking<T>::SinkBlocking(size_t const size)
{ }

template<typename T>
SinkBlocking<T>::operator T()
T SinkBlocking<T>::pop()
{
_mutex.lock();
while (_data.isEmpty())
Expand Down
16 changes: 12 additions & 4 deletions src/threading/Source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ class Source
public:

void connectTo(SinkBase<T> & sink);
void operator = (T const & other);
void push(T const & val);

void operator = (T const & val) [[deprecated("Use 'push()' instead.")]];

private:
std::list<SinkBase<T> *> _sink_list;
Expand All @@ -60,14 +62,20 @@ void Source<T>::connectTo(SinkBase<T> & sink)
}

template<typename T>
void Source<T>::operator = (T const & value)
void Source<T>::push(T const & val)
{
std::for_each(std::begin(_sink_list),
std::end (_sink_list),
[value](SinkBase<T> * sink)
[val](SinkBase<T> * sink)
{
sink->inject(value);
sink->inject(val);
});
}

template<typename T>
void Source<T>::operator = (T const & val)
{
push(val);
}

#endif /* ARDUINO_THREADS_SOURCE_HPP_ */