SuperTinyKernel™ RTOS 1.05.3
Lightweight, high-performance, deterministic, bare-metal C++ RTOS for resource-constrained embedded systems. MIT Open Source License.
Loading...
Searching...
No Matches
test_pipe.cpp
Go to the documentation of this file.
1/*
2 * SuperTinyKernel(TM) RTOS: Lightweight High-Performance Deterministic C++ RTOS for Embedded Systems.
3 *
4 * Source: https://github.com/SuperTinyKernel-RTOS
5 *
6 * Copyright (c) 2022-2026 Neutron Code Limited <stk@neutroncode.com>. All Rights Reserved.
7 * License: MIT License, see LICENSE for a full text.
8 */
9
10#include <stk_config.h>
11#include <stk.h>
12#include <sync/stk_sync_pipe.h>
13#include <assert.h>
14#include <string.h>
15
16#include "stktest_context.h"
17
18using namespace stk;
19using namespace stk::test;
20
22
23#define _STK_PIPE_TEST_TASKS_MAX 5
24#define _STK_PIPE_TEST_TIMEOUT 300
25#define _STK_PIPE_TEST_SHORT_SLEEP 10
26#define _STK_PIPE_TEST_LONG_SLEEP 100
27#define _STK_PIPE_CAPACITY 8 // pipe capacity used by all tests
28#ifdef __ARM_ARCH_6M__
29#define _STK_PIPE_STACK_SIZE 128 // ARM Cortex-M0
30#define STK_TASK
31#else
32#define _STK_PIPE_STACK_SIZE 256
33#define STK_TASK static
34#endif
35
36#ifndef _NEW
37inline void *operator new(std::size_t, void *ptr) noexcept { return ptr; }
38inline void operator delete(void *, void *) noexcept { /* nothing for placement delete */ }
39#endif
40
41namespace stk {
42namespace test {
43
47namespace pipe {
48
49// Test results storage
50static volatile int32_t g_TestResult = 0;
51static volatile int32_t g_SharedCounter = 0;
52static volatile bool g_TestComplete = false;
53static volatile int32_t g_InstancesDone = 0;
54
55// Kernel (Pipe uses ConditionVariable internally, so KERNEL_SYNC is required)
57
58// Test pipe (re-constructed per test via ResetTestState)
60
67template <EAccessMode _AccessMode>
68class BasicWriteReadTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
69{
70 uint8_t m_task_id;
71 int32_t m_iterations;
72
73public:
74 BasicWriteReadTask(uint8_t task_id, int32_t iterations) : m_task_id(task_id), m_iterations(iterations)
75 {}
76
77private:
78 void Run()
79 {
80 if (m_task_id == 0)
81 {
82 // Producer: write sequential values
83 for (int32_t i = 0; i < m_iterations; ++i)
85
87
88 printf("basic write/read: counter=%d (expected %d)\n",
89 (int)g_SharedCounter, (int)m_iterations);
90
92 g_TestResult = 1;
93 }
94 else
95 if (m_task_id == 1)
96 {
97 // Consumer: read and verify each value matches expected sequence
98 for (int32_t i = 0; i < m_iterations; ++i)
99 {
100 int32_t value = -1;
101 if (g_TestPipe.Read(value, _STK_PIPE_TEST_TIMEOUT) && (value == i))
103 }
104 }
105 }
106};
107
115template <EAccessMode _AccessMode>
116class WriteBlocksWhenFullTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
117{
118 uint8_t m_task_id;
119
120public:
121 WriteBlocksWhenFullTask(uint8_t task_id, int32_t) : m_task_id(task_id)
122 {}
123
124private:
125 void Run()
126 {
127 if (m_task_id == 0)
128 {
129 // Fill pipe to capacity (all succeed immediately; pipe is empty)
130 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
131 {
134 }
135
136 // One more Write() must block until consumer reads a slot
138 ++g_SharedCounter; // CAPACITY + 1: unblocked by consumer
139
141
142 printf("write blocks when full: counter=%d (expected %d)\n",
143 (int)g_SharedCounter, (int)(_STK_PIPE_CAPACITY + 1));
144
145 if (g_SharedCounter == (int32_t)(_STK_PIPE_CAPACITY + 1))
146 g_TestResult = 1;
147 }
148 else
149 if (m_task_id == 1)
150 {
151 // Consumer: wait until pipe is full then drain one element to unblock producer
152 stk::Sleep(_STK_PIPE_TEST_SHORT_SLEEP); // let producer fill the pipe first
153
154 int32_t value = -1;
155 g_TestPipe.Read(value, _STK_PIPE_TEST_TIMEOUT); // frees one slot for producer
156 }
157 }
158};
159
166template <EAccessMode _AccessMode>
167class ReadBlocksWhenEmptyTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
168{
169 uint8_t m_task_id;
170
171public:
172 ReadBlocksWhenEmptyTask(uint8_t task_id, int32_t) : m_task_id(task_id)
173 {}
174
175private:
176 void Run()
177 {
178 if (m_task_id == 0)
179 {
180 // Producer: wait to ensure consumer is blocked, then write
182
184
186
187 printf("read blocks when empty: counter=%d (expected 1)\n", (int)g_SharedCounter);
188
189 if (g_SharedCounter == 1)
190 g_TestResult = 1;
191 }
192 else
193 if (m_task_id == 1)
194 {
195 // Consumer: Read() on an empty pipe must block until producer writes
196 int32_t value = -1;
197 if (g_TestPipe.Read(value, _STK_PIPE_TEST_TIMEOUT) && (value == 42))
198 ++g_SharedCounter; // 1: correctly received the produced value
199 }
200 }
201};
202
210template <EAccessMode _AccessMode>
211class TimeoutTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
212{
213 uint8_t m_task_id;
214
215public:
216 TimeoutTask(uint8_t task_id, int32_t) : m_task_id(task_id)
217 {}
218
219private:
220 void Run()
221 {
222 if (m_task_id == 1)
223 {
224 // Read() on empty pipe with short timeout must expire and return false
225 int32_t value = -1;
226 int64_t start = GetTimeNowMs();
227 bool ok = g_TestPipe.Read(value, 50);
228 int64_t elapsed = GetTimeNowMs() - start;
229
230 if (!ok && elapsed >= 45 && elapsed <= 65)
231 ++g_SharedCounter; // 1: read timeout returned false in correct window
232 }
233 else
234 if (m_task_id == 2)
235 {
236 // Make sure task 1 is trying to read first in order to expire
238
239 // Fill pipe to capacity so Write() has nowhere to go
240 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
242
243 // Write() on full pipe with short timeout must expire and return false
244 int64_t start = GetTimeNowMs();
245 bool ok = g_TestPipe.Write(99, 50);
246 int64_t elapsed = GetTimeNowMs() - start;
247
248 if (!ok && elapsed >= 45 && elapsed <= 65)
249 ++g_SharedCounter; // 2: write timeout returned false in correct window
250 }
251
253
254 if (m_task_id == 0)
255 {
256 while (g_InstancesDone < 3)
258
259 printf("timeout: counter=%d (expected 2)\n", (int)g_SharedCounter);
260
261 if (g_SharedCounter == 2)
262 g_TestResult = 1;
263 }
264 }
265};
266
273template <EAccessMode _AccessMode>
274class BulkWriteReadTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
275{
276 uint8_t m_task_id;
278
279public:
280 BulkWriteReadTask(uint8_t task_id, int32_t iterations) : m_task_id(task_id), m_iterations(iterations)
281 {}
282
283private:
284 void Run()
285 {
286 if (m_task_id == 0)
287 {
288 // Producer: build a sequential block and write it all at once
289 int32_t src[_STK_PIPE_CAPACITY] = {0};
290 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
291 src[i] = i;
292
293 size_t written = g_TestPipe.WriteBulk(src, _STK_PIPE_CAPACITY, _STK_PIPE_TEST_TIMEOUT);
294
296
297 printf("bulk write/read: written=%d, counter=%d (expected %d)\n",
298 (int)written, (int)g_SharedCounter, (int)_STK_PIPE_CAPACITY);
299
300 if ((written == _STK_PIPE_CAPACITY) && (g_SharedCounter == (int32_t)_STK_PIPE_CAPACITY))
301 g_TestResult = 1;
302 }
303 else
304 if (m_task_id == 1)
305 {
306 // Consumer: read the whole block back and verify each element
307 int32_t dst[_STK_PIPE_CAPACITY] = {0};
308 size_t read_count = g_TestPipe.ReadBulk(dst, _STK_PIPE_CAPACITY, _STK_PIPE_TEST_TIMEOUT);
309
310 if (read_count == _STK_PIPE_CAPACITY)
311 {
312 bool all_correct = true;
313
314 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
315 {
316 if (dst[i] != i)
317 {
318 all_correct = false;
319 break;
320 }
321 }
322
323 if (all_correct)
325 }
326 }
327 }
328};
329
337template <EAccessMode _AccessMode>
338class GetSizeIsEmptyTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
339{
340 uint8_t m_task_id;
341
342public:
343 GetSizeIsEmptyTask(uint8_t task_id, int32_t) : m_task_id(task_id)
344 {}
345
346private:
347 void Run()
348 {
349 if (m_task_id == 1)
350 {
351 bool all_ok = true;
352
353 // Pipe must be empty initially
354 if (!g_TestPipe.IsEmpty() || g_TestPipe.GetSize() != 0)
355 all_ok = false;
356
357 // Write elements one by one; GetSize() must track exactly
358 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
359 {
361
362 if (g_TestPipe.GetSize() != (size_t)(i + 1))
363 {
364 all_ok = false;
365 break;
366 }
367 }
368
369 // Drain elements one by one; GetSize() must track exactly
370 for (int32_t i = 0; i < (int32_t)_STK_PIPE_CAPACITY; ++i)
371 {
372 int32_t value = -1;
374
375 if (g_TestPipe.GetSize() != (size_t)(_STK_PIPE_CAPACITY - i - 1))
376 {
377 all_ok = false;
378 break;
379 }
380 }
381
382 // Pipe must be empty again after full drain
383 if (!g_TestPipe.IsEmpty())
384 all_ok = false;
385
386 if (all_ok)
388 }
389
390 if (m_task_id == 0)
391 {
393
394 printf("get-size/is-empty: counter=%d (expected 1)\n", (int)g_SharedCounter);
395
396 if (g_SharedCounter == 1)
397 g_TestResult = 1;
398 }
399 }
400};
401
409template <EAccessMode _AccessMode>
410class MultiProducerConsumerTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
411{
412 uint8_t m_task_id;
414
415public:
416 MultiProducerConsumerTask(uint8_t task_id, int32_t iterations) : m_task_id(task_id), m_iterations(iterations)
417 {}
418
419private:
420 void Run()
421 {
422 if (m_task_id == 1 || m_task_id == 2)
423 {
424 // Producers: write m_iterations values each
425 for (int32_t i = 0; i < m_iterations; ++i)
427 }
428 else
429 if (m_task_id == 3 || m_task_id == 4)
430 {
431 // Consumers: read m_iterations values each
432 for (int32_t i = 0; i < m_iterations; ++i)
433 {
434 int32_t value = -1;
435 if (g_TestPipe.Read(value, _STK_PIPE_TEST_TIMEOUT))
437 }
438 }
439
441
442 if (m_task_id == 0)
443 {
444 // Wait for all four producers and consumers to finish
447
448 int32_t expected = 2 * m_iterations; // two consumers * m_iterations reads each
449
450 printf("multi producer/consumer: counter=%d (expected %d)\n",
451 (int)g_SharedCounter, (int)expected);
452
453 if (g_SharedCounter == expected)
454 g_TestResult = 1;
455 }
456 }
457};
458
467template <EAccessMode _AccessMode>
468class StressTestTask : public Task<_STK_PIPE_STACK_SIZE, _AccessMode>
469{
470 uint8_t m_task_id;
472
473public:
474 StressTestTask(uint8_t task_id, int32_t iterations) : m_task_id(task_id), m_iterations(iterations)
475 {}
476
477private:
478 void Run()
479 {
480 int32_t written = 0;
481 int32_t consumed = 0;
482
483 for (int32_t i = 0; i < m_iterations; ++i)
484 {
485 if ((i % 2) == 0)
486 {
487 // Producer role: write value; may block briefly if pipe is full
489 ++written;
490 }
491 else
492 {
493 // Consumer role: read value; may time out if pipe is empty
494 int32_t value = -1;
495 if (g_TestPipe.Read(value, _STK_PIPE_TEST_SHORT_SLEEP))
496 ++consumed;
497 }
498 }
499
500 // Accumulate per-task write and read counts into shared counters atomically
501 // via the pipe's own mutex-free atomic increment (each task adds its own slice)
502 // Using g_SharedCounter for net writes minus reads; net >= 0 if no data is lost
503 g_SharedCounter += (written - consumed);
504
506
508 {
511
512 // Drain any remaining items left in the pipe
513 int32_t remaining = (int32_t)g_TestPipe.GetSize();
514
515 printf("stress test: net_written=%d remaining=%d (expected: remaining >= 0)\n",
516 (int)g_SharedCounter, (int)remaining);
517
518 // net written minus read, plus anything still in the pipe, must be non-negative;
519 // any negative value means reads exceeded writes which indicates data corruption
520 if ((g_SharedCounter + remaining) >= 0)
521 g_TestResult = 1;
522 }
523 }
524};
525
526// Helper function to reset test state
527static void ResetTestState()
528{
529 g_TestResult = 0;
530 g_SharedCounter = 0;
531 g_TestComplete = false;
532 g_InstancesDone = 0;
533
534 // Re-construct the pipe in-place for a clean state; the pipe destructor does not assert
535 // on non-empty state (unlike ConditionVariable), but reconstruction ensures m_head,
536 // m_tail, m_count and both condition variables are fully reset between tests
537 g_TestPipe.~Pipe();
539}
540
541} // namespace pipe
542} // namespace test
543} // namespace stk
544
545static bool NeedsExtendedTasks(const char *test_name)
546{
547 return (strcmp(test_name, "BasicWriteRead") != 0) &&
548 (strcmp(test_name, "WriteBlocksWhenFull") != 0) &&
549 (strcmp(test_name, "ReadBlocksWhenEmpty") != 0) &&
550 (strcmp(test_name, "Timeout") != 0) &&
551 (strcmp(test_name, "BulkWriteRead") != 0) &&
552 (strcmp(test_name, "GetSizeIsEmpty") != 0);
553}
554
558template <class TaskType>
559static int32_t RunTest(const char *test_name, int32_t param = 0)
560{
561 using namespace stk;
562 using namespace stk::test;
563 using namespace stk::test::pipe;
564
565 printf("Test: %s\n", test_name);
566
568
569 // Create tasks based on test type
570 STK_TASK TaskType task0(0, param);
571 STK_TASK TaskType task1(1, param);
572 STK_TASK TaskType task2(2, param);
573 TaskType task3(3, param);
574 TaskType task4(4, param);
575
576 g_Kernel.AddTask(&task0);
577 g_Kernel.AddTask(&task1);
578 g_Kernel.AddTask(&task2);
579
580 if (NeedsExtendedTasks(test_name))
581 {
582 g_Kernel.AddTask(&task3);
583 g_Kernel.AddTask(&task4);
584 }
585
586 g_Kernel.Start();
587
589
590 printf("Result: %s\n", result == TestContext::SUCCESS_EXIT_CODE ? "PASS" : "FAIL");
591 printf("--------------\n");
592
593 return result;
594}
595
599int main(int argc, char **argv)
600{
601 (void)argc;
602 (void)argv;
603
604 using namespace stk::test::pipe;
605
607
608 int total_failures = 0, total_success = 0;
609
610 printf("--------------\n");
611
612 g_Kernel.Initialize();
613
614#ifndef __ARM_ARCH_6M__
615
616 // Test 1: Write()/Read() transfers values correctly in FIFO order (tasks 0-2 only)
618 total_failures++;
619 else
620 total_success++;
621
622 // Test 2: Write() blocks when pipe is full; unblocks when consumer frees a slot (tasks 0-2 only)
624 total_failures++;
625 else
626 total_success++;
627
628 // Test 3: Read() blocks when pipe is empty; unblocks when producer writes data (tasks 0-2 only)
630 total_failures++;
631 else
632 total_success++;
633
634 // Test 4: Write() on full pipe and Read() on empty pipe both time out correctly (tasks 0-2 only)
636 total_failures++;
637 else
638 total_success++;
639
640 // Test 5: WriteBulk()/ReadBulk() transfers a full block with correct element count and values (tasks 0-2 only)
642 total_failures++;
643 else
644 total_success++;
645
646 // Test 6: GetSize() and IsEmpty() accurately reflect pipe state across fill and drain cycle (tasks 0-2 only)
648 total_failures++;
649 else
650 total_success++;
651
652 // Test 7: Two producers and two consumers transfer data concurrently without loss
654 total_failures++;
655 else
656 total_success++;
657
658#endif // __ARM_ARCH_6M__
659
660 // Test 8: Stress test under full five-task contention with alternating producer/consumer roles
662 total_failures++;
663 else
664 total_success++;
665
666 int32_t final_result = (total_failures == 0 ? TestContext::SUCCESS_EXIT_CODE : TestContext::DEFAULT_FAILURE_EXIT_CODE);
667
668 printf("##############\n");
669 printf("Total tests: %d\n", total_failures + total_success);
670 printf("Failures: %d\n", total_failures);
671
673 return final_result;
674}
Top-level STK include. Provides the Kernel class template and all built-in task-switching strategies.
Implementation of synchronization primitive: stk::sync::Pipe.
static int32_t RunTest(const char *test_name, int32_t param=0)
#define STK_TASK
#define _STK_PIPE_TEST_TASKS_MAX
Definition test_pipe.cpp:23
#define _STK_PIPE_TEST_LONG_SLEEP
Definition test_pipe.cpp:26
#define _STK_PIPE_TEST_TIMEOUT
Definition test_pipe.cpp:24
int main(int argc, char **argv)
#define _STK_PIPE_CAPACITY
Definition test_pipe.cpp:27
static int32_t RunTest(const char *test_name, int32_t param=0)
#define _STK_PIPE_TEST_SHORT_SLEEP
Definition test_pipe.cpp:25
static bool NeedsExtendedTasks(const char *test_name)
#define STK_TEST_DECL_ASSERT
Declare assertion redirector in the source file.
Namespace of STK package.
static int64_t GetTimeNowMs()
Get current time in milliseconds since kernel start.
Definition stk_helper.h:281
void Sleep(uint32_t ticks)
Put calling process into a sleep state.
Definition stk_helper.h:298
Namespace of the test inventory.
Namespace of Pipe test.
static Kernel< KERNEL_DYNAMIC|KERNEL_SYNC, 5, SwitchStrategyRR, PlatformDefault > g_Kernel
Definition test_pipe.cpp:56
static sync::Pipe< int32_t, 8 > g_TestPipe
Definition test_pipe.cpp:59
static volatile int32_t g_InstancesDone
Definition test_pipe.cpp:53
static volatile int32_t g_SharedCounter
Definition test_pipe.cpp:51
static volatile bool g_TestComplete
Definition test_pipe.cpp:52
static void ResetTestState()
static volatile int32_t g_TestResult
Definition test_pipe.cpp:50
Concrete implementation of IKernel.
Definition stk.h:83
Task(const Task &)=delete
Thread-safe FIFO communication pipe for inter-task data passing.
Tests basic Write()/Read() functionality in producer-consumer arrangement.
Definition test_pipe.cpp:69
void Run()
Entry point of the user task.
Definition test_pipe.cpp:78
BasicWriteReadTask(uint8_t task_id, int32_t iterations)
Definition test_pipe.cpp:74
Tests that Write() blocks when the pipe is full and unblocks when space is freed.
void Run()
Entry point of the user task.
WriteBlocksWhenFullTask(uint8_t task_id, int32_t)
Tests that Read() blocks when the pipe is empty and unblocks when data arrives.
ReadBlocksWhenEmptyTask(uint8_t task_id, int32_t)
void Run()
Entry point of the user task.
Tests that Write() and Read() return false within the expected time on timeout.
TimeoutTask(uint8_t task_id, int32_t)
void Run()
Entry point of the user task.
Tests WriteBulk()/ReadBulk() for multi-element block transfers.
BulkWriteReadTask(uint8_t task_id, int32_t iterations)
void Run()
Entry point of the user task.
Tests GetSize() and IsEmpty() reflect accurate pipe state.
GetSizeIsEmptyTask(uint8_t task_id, int32_t)
void Run()
Entry point of the user task.
Tests concurrent multi-producer / multi-consumer throughput.
MultiProducerConsumerTask(uint8_t task_id, int32_t iterations)
void Run()
Entry point of the user task.
Stress test of Pipe under full five-task contention.
StressTestTask(uint8_t task_id, int32_t iterations)
void Run()
Entry point of the user task.
static void ShowTestSuitePrologue()
Show text string as prologue before tests start.
@ DEFAULT_FAILURE_EXIT_CODE
default exit code for exit() to denote failure of the test
@ SUCCESS_EXIT_CODE
exit code for exit() to denote the success of the test
static void ShowTestSuiteEpilogue(int32_t result)
Show text string as epilogue after tests end.