SuperTinyKernel™ RTOS 1.05.3
Lightweight, high-performance, deterministic, bare-metal C++ RTOS for resource-constrained embedded systems. MIT Open Source License.
Loading...
Searching...
No Matches
stk_sync_pipe.h
Go to the documentation of this file.
1/*
2 * SuperTinyKernel(TM) RTOS: Lightweight High-Performance Deterministic C++ RTOS for Embedded Systems.
3 *
4 * Source: https://github.com/SuperTinyKernel-RTOS
5 *
6 * Copyright (c) 2022-2026 Neutron Code Limited <stk@neutroncode.com>. All Rights Reserved.
7 * License: MIT License, see LICENSE for a full text.
8 */
9
10#ifndef STK_SYNC_PIPE_H_
11#define STK_SYNC_PIPE_H_
12
13#include <type_traits>
14#include <cstring> // for memcpy
15
16#include "stk_sync_cv.h"
17
21
22namespace stk {
23namespace sync {
24
57template <typename T, size_t N>
58class Pipe
59{
60public:
63 explicit Pipe() : m_buffer(), m_head(0U), m_tail(0U), m_count(0U), m_cv_empty(), m_cv_full()
64 {}
65
76 bool Write(const T &data, Timeout timeout = WAIT_INFINITE)
77 {
79
80 while (m_count == N)
81 {
82 if (!m_cv_full.Wait(cs_, timeout))
83 return false;
84 }
85
86 m_buffer[m_head] = data;
87 m_head = (m_head + 1U) % N;
88 m_count += 1U;
89
90 // notify consumer that data is available
91 m_cv_empty.NotifyOne();
92
93 return true;
94 }
95
103 bool TryWrite(const T &data) { return Write(data, NO_WAIT); }
104
127 size_t WriteBulk(const T *src, size_t count, Timeout timeout = WAIT_INFINITE)
128 {
129 if ((src == nullptr) || (count == 0U))
130 return 0U;
131
132 size_t written = 0U;
134
135 while (written < count)
136 {
137 // wait until there is at least 1 slot available
138 while (m_count == N)
139 {
140 if (!m_cv_full.Wait(cs_, timeout))
141 return written; // Return partial count on timeout
142 }
143
144 // calculate how many we can copy in this contiguous stretch
145 size_t available = N - m_count;
146 size_t to_write = ((count - written) < available ? (count - written) : available);
147
148 // copy from source
149 // note: if value type is not scalar or queue is small we copy with a for loop,
150 // otherwise using faster memcpy version for large scalar arrays
151 if (!std::is_scalar<T>::value && (N < 8))
152 {
153 for (size_t i = 0; i < to_write; ++i)
154 {
155 m_buffer[m_head] = src[written++];
156 m_head = (m_head + 1U) % N;
157 m_count += 1U;
158 }
159 }
160 else
161 {
162 size_t first_part = N - m_head;
163 if (to_write <= first_part)
164 {
165 memcpy(&m_buffer[m_head], &src[written], (to_write * sizeof(T)));
166 }
167 else
168 {
169 memcpy(&m_buffer[m_head], &src[written], (first_part * sizeof(T)));
170 memcpy(&m_buffer[0], &src[written + first_part], ((to_write - first_part) * sizeof(T)));
171 }
172 written += to_write;
173 m_head = (m_head + to_write) % N;
174 m_count += to_write;
175 }
176
177 // notify consumers that data is ready
178 m_cv_empty.NotifyAll();
179 }
180
181 return written;
182 }
183
195 size_t TryWriteBulk(const T *src, size_t count) { return WriteBulk(src, count, NO_WAIT); }
196
207 bool Read(T &data, Timeout timeout = WAIT_INFINITE)
208 {
210
211 while (m_count == 0U)
212 {
213 if (!m_cv_empty.Wait(cs_, timeout))
214 return false;
215 }
216
217 data = m_buffer[m_tail];
218 m_tail = (m_tail + 1U) % N;
219 m_count -= 1U;
220
221 // notify producer that space is available
222 m_cv_full.NotifyOne();
223
224 return true;
225 }
226
235 bool TryRead(T &data) { return Read(data, NO_WAIT); }
236
260 size_t ReadBulk(T *dst, size_t count, Timeout timeout = WAIT_INFINITE)
261 {
262 if ((dst == nullptr) || (count == 0U))
263 return 0U;
264
265 size_t read_count = 0U;
266
268
269 while (read_count < count)
270 {
271 // wait until there is at least 1 element available
272 while (m_count == 0U)
273 {
274 if (!m_cv_empty.Wait(cs_, timeout))
275 return read_count; // return partial count on timeout
276 }
277
278 // determine how many we can pull in this stretch
279 size_t to_read = (count - read_count) < m_count ? (count - read_count) : m_count;
280
281 // copy to destination
282 // note: if value type is not scalar or queue is small we copy with a for loop,
283 // otherwise using faster memcpy version for large scalar arrays
284 if (!std::is_scalar<T>::value || (N < 8))
285 {
286 for (size_t i = 0U; i < to_read; ++i)
287 {
288 dst[read_count++] = m_buffer[m_tail];
289 m_tail = (m_tail + 1) % N;
290 m_count -= 1U;
291 }
292 }
293 else
294 {
295 size_t first_part = N - m_tail;
296 if (to_read <= first_part)
297 {
298 memcpy(&dst[read_count], &m_buffer[m_tail], (to_read * sizeof(T)));
299 }
300 else
301 {
302 memcpy(&dst[read_count], &m_buffer[m_tail], (first_part * sizeof(T)));
303 memcpy(&dst[read_count + first_part], &m_buffer[0], ((to_read - first_part) * sizeof(T)));
304 }
305 read_count += to_read;
306 m_tail = (m_tail + to_read) % N;
307 m_count -= to_read;
308 }
309
310 // notify producers that space is now available
311 m_cv_full.NotifyAll();
312 }
313
314 return read_count;
315 }
316
328 size_t TryReadBulk(T *dst, size_t count) { return ReadBulk(dst, count, NO_WAIT); }
329
337 size_t GetSize() const { return m_count; }
338
346 bool IsEmpty() const { return (GetSize() == 0); }
347
348private:
350
351 T m_buffer[N];
352 size_t m_head;
353 size_t m_tail;
354 size_t m_count;
357};
358
359} // namespace sync
360} // namespace stk
361
362#endif /* STK_SYNC_PIPE_H_ */
363
Implementation of synchronization primitive: stk::sync::ConditionVariable.
Namespace of STK package.
const Timeout WAIT_INFINITE
Timeout value: block indefinitely until the synchronization object is signaled.
Definition stk_common.h:139
int32_t Timeout
Timeout time (ticks).
Definition stk_common.h:133
const Timeout NO_WAIT
Timeout value: return immediately if the synchronization object is not yet signaled (non-blocking pol...
Definition stk_common.h:145
Synchronization primitives for task coordination and resource protection.
RAII-style low-level synchronization primitive for atomic code execution. Used as building brick for ...
Definition stk_sync_cs.h:54
Condition Variable primitive for signaling between tasks based on specific predicates.
Definition stk_sync_cv.h:68
bool TryRead(T &data)
Attempt to read data from the pipe.
Pipe()
Constructor.
size_t GetSize() const
Get the current number of elements in the pipe.
size_t WriteBulk(const T *src, size_t count, Timeout timeout=WAIT_INFINITE)
Write multiple elements to the pipe.
size_t TryWriteBulk(const T *src, size_t count)
Attempt to write multiple elements to the pipe.
bool Write(const T &data, Timeout timeout=WAIT_INFINITE)
Write data to the pipe.
bool Read(T &data, Timeout timeout=WAIT_INFINITE)
Read data from the pipe.
bool IsEmpty() const
Check if queue is currently empty.
bool TryWrite(const T &data)
Attempt to write data to the pipe.
STK_NONCOPYABLE_CLASS(Pipe)
size_t TryReadBulk(T *dst, size_t count)
Attempt to read multiple elements from the pipe.
size_t ReadBulk(T *dst, size_t count, Timeout timeout=WAIT_INFINITE)
Read multiple elements from the pipe.