datatypes: add lockfree version of counter and ringbuffer (#24839)

This commit is contained in:
kbkpbot 2025-07-04 23:45:14 +08:00 committed by GitHub
parent 0fdca4b0dc
commit c216e59bfc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1671 additions and 0 deletions

View file

@ -0,0 +1,67 @@
# Lockfree Library for V
A high-performance, thread-safe collection of lock-free data structures for
the V programming language. Designed for concurrent applications requiring
low-latency and high-throughput data processing.
## Features
- **Truly Lock-Free**: No mutexes or spinlocks
- **Cross-Platform**: Works on Windows, Linux, macOS
- **Configurable**: Tune parameters for specific workloads
- **High Performance**: Optimized for modern multi-core processors
## Data Structures
### 1. Atomic Counter
A thread-safe counter with atomic operations.
```v
import datatypes.lockfree
mut counter := lockfree.new_counter[int](0)
counter.increment()
counter.increment_by(5)
value := counter.get() // 6
counter.decrement()
counter.clear()
```
**Features**:
- Atomic increment/decrement
- Batch operations
- Get current value
- Reset functionality
### 2. Ring Buffer
A circular buffer for producer-consumer scenarios.
```v
import datatypes.lockfree
mut rb := lockfree.new_ringbuffer[int](1024)
rb.push(10)
rb.push(20)
item := rb.pop() // 10
free := rb.remaining()
```
**Features**:
- Single/Multi producer/consumer modes
- Blocking/non-blocking operations
- Batch operations
- Configurable size
- Memory efficient
## Acknowledgements
This library incorporates research and design principles from:
- Intel Threading Building Blocks (TBB)
- Facebook Folly
- Java Concurrent Package
- Dmitry Vyukov's lock-free algorithms
- DPDK rte ring
---

View file

@ -0,0 +1,218 @@
module main
import time
import sync
import os
import flag
import runtime
// Test configuration parameters
const buffer_size = 1024 // Size of the channel
const items_per_thread = 1_000_000 // Items to produce per thread
const warmup_runs = 3 // Number of warmup runs
const test_runs = 5 // Number of test runs
const max_threads = runtime.nr_jobs() // Maximum number of threads
// Performance test results
struct ChannelPerfResult {
scenario string // Test scenario identifier
throughput f64 // Throughput in million operations per second
latency f64 // Average latency in nanoseconds
cpu_usage f64 // CPU usage percentage
}
fn main() {
println('Channel Performance Test')
println('======================================')
println('Maximum number of threads set to nr_jobs = ${max_threads}')
mut fp := flag.new_flag_parser(os.args.clone())
fp.skip_executable()
show_help := fp.bool('help', 0, false, 'Show this help screen\n')
debug := fp.bool('debug', 0, false, 'Show debug message, stat of channel')
if show_help {
println(fp.usage())
exit(0)
}
// Test different scenarios
mut results := []ChannelPerfResult{}
// Single Producer Single Consumer
results << test_scenario('SPSC', 1, 1, debug)
// Multiple Producers Single Consumer
for i in [2, 4, 8, 16] {
if i <= max_threads {
results << test_scenario('MPSC (${i}P1C)', i, 1, debug)
}
}
// Single Producer Multiple Consumers
for i in [2, 4, 8, 16] {
if i <= max_threads {
results << test_scenario('SPMC (1P${i}C)', 1, i, debug)
}
}
// Multiple Producers Multiple Consumers
for i in [2, 4, 8, 16] {
if i * 2 <= max_threads {
results << test_scenario('MPMC (${i}P${i}C)', i, i, debug)
}
}
// Print final results
print_results(results)
}
// Test specific scenario with given producers/consumers
fn test_scenario(scenario string, producers int, consumers int, debug bool) ChannelPerfResult {
println('\nTesting scenario: ${scenario}')
// Create channel
ch := chan int{cap: buffer_size}
defer {
ch.close()
}
// Warmup runs
for _ in 0 .. warmup_runs {
run_test(ch, producers, consumers, false, debug)
}
// Actual test runs
mut total_time := time.Duration(0)
mut total_ops := 0
for _ in 0 .. test_runs {
duration, ops := run_test(ch, producers, consumers, true, debug)
total_time += duration
total_ops += ops
}
// Calculate performance metrics
avg_time := total_time / test_runs
throughput := f64(total_ops) / avg_time.seconds() / 1_000_000 // MOPS
latency := avg_time.nanoseconds() / f64(total_ops / test_runs) // ns/op
return ChannelPerfResult{
scenario: scenario
throughput: throughput
latency: latency
cpu_usage: 0.0 // Actual value should be obtained from system
}
}
// Execute single test run
fn run_test(ch chan int, producers int, consumers int, measure bool, debug bool) (time.Duration, int) {
mut wg := sync.new_waitgroup()
total_items := producers * items_per_thread
// Key fix: Consumers should consume exact producer total
items_per_consumer := total_items / consumers
mut remaining := total_items % consumers
// Start producers
start_time := time.now()
for i in 0 .. producers {
wg.add(1)
spawn producer_thread(ch, i, mut wg, debug)
}
// Start consumers
mut consumed_counts := []int{len: consumers, init: 0}
for i in 0 .. consumers {
wg.add(1)
// Distribute remaining items to first consumers
mut count := items_per_consumer
if remaining > 0 {
count += 1
remaining -= 1
}
spawn consumer_thread(ch, i, count, mut consumed_counts, mut wg, debug)
}
// Wait for all threads to complete
wg.wait()
end_time := time.now()
// Validate results
mut total_consumed := 0
for count in consumed_counts {
total_consumed += count
}
if total_consumed != total_items {
eprintln('Error: Produced ${total_items} items but consumed ${total_consumed}')
}
duration := end_time - start_time
if measure {
println('Completed ${total_items} items in ${duration.milliseconds()}ms')
}
return duration, total_items
}
// Producer thread implementation
fn producer_thread(ch chan int, id int, mut wg sync.WaitGroup, debug bool) {
defer {
wg.done()
}
// Generate items in producer-specific range
start := id * items_per_thread
end := start + items_per_thread
for i in start .. end {
ch <- i
}
}
// Consumer thread with fixed consumption target
fn consumer_thread(ch chan int, id int, items_to_consume int, mut consumed_counts []int, mut wg sync.WaitGroup, debug bool) {
defer {
wg.done()
}
for i in 0 .. items_to_consume {
_ := <-ch
// Debug output
if debug && i % 1000000 == 0 {
println('consume item count = ${i}')
}
}
consumed_counts[id] = items_to_consume
}
// Print formatted performance results
fn print_results(results []ChannelPerfResult) {
println('\nPerformance Results')
println('====================================================================')
println('Scenario\t\tThroughput (M ops/s)\tAvg Latency (ns)\tCPU Usage (%)')
println('--------------------------------------------------------------------')
for res in results {
println('${res.scenario:20}\t${res.throughput:8.2f}\t\t\t${res.latency:8.2f}\t\t\t${res.cpu_usage:5.1f}')
}
println('====================================================================')
// Find best performing scenario
mut best_throughput := 0.0
mut best_scenario := ''
for res in results {
if res.throughput > best_throughput {
best_throughput = res.throughput
best_scenario = res.scenario
}
}
println('\nBest performance: ${best_scenario} with ${best_throughput:.2f} M ops/s')
}

View file

@ -0,0 +1,292 @@
module main
import datatypes.lockfree
import time
import sync
import os
import flag
import runtime
// Test configuration parameters
const buffer_size = 1024 // Size of the ring buffer
const items_per_thread = 1_000_000 // Items to produce per thread
const warmup_runs = 3 // Number of warmup runs
const test_runs = 5 // Number of test runs
const max_threads = runtime.nr_jobs() // Maximum number of threads
// Performance test results
struct RingBufferPerfResult {
scenario string // Test scenario identifier
throughput f64 // Throughput in million operations per second
latency f64 // Average latency in nanoseconds
cpu_usage f64 // CPU usage percentage
}
fn main() {
println('Lock-Free Ring Buffer Performance Test')
println('======================================')
println('Maximum number of threads set to nr_jobs = ${max_threads}')
mut fp := flag.new_flag_parser(os.args.clone())
fp.skip_executable()
show_help := fp.bool('help', 0, false, 'Show this help screen\n')
debug := fp.bool('debug', 0, false, 'Show debug message, stat of ringbuffer')
batch := fp.bool('batch', 0, true, 'Batch mode, batch size = 32')
if show_help {
println(fp.usage())
exit(0)
}
// Test different scenarios
mut results := []RingBufferPerfResult{}
// Single Producer Single Consumer
results << test_scenario('SPSC', 1, 1, batch, debug)
// Multiple Producers Single Consumer
for i in [2, 4, 8, 16] {
if i <= max_threads {
results << test_scenario('MPSC (${i}P1C)', i, 1, batch, debug)
}
}
// Single Producer Multiple Consumers
for i in [2, 4, 8, 16] {
if i <= max_threads {
results << test_scenario('SPMC (1P${i}C)', 1, i, batch, debug)
}
}
// Multiple Producers Multiple Consumers
for i in [2, 4, 8, 16] {
if i * 2 <= max_threads {
results << test_scenario('MPMC (${i}P${i}C)', i, i, batch, debug)
}
}
// Print final results
print_results(results)
}
// Test specific scenario with given producers/consumers
fn test_scenario(scenario string, producers int, consumers int, batch bool, debug bool) RingBufferPerfResult {
println('\nTesting scenario: ${scenario}')
// Create ring buffer
mut rb := lockfree.new_ringbuffer[int](buffer_size)
// Warmup runs
for _ in 0 .. warmup_runs {
run_test(mut rb, producers, consumers, false, batch, debug)
rb.clear()
}
// Actual test runs
mut total_time := time.Duration(0)
mut total_ops := 0
for _ in 0 .. test_runs {
duration, ops := run_test(mut rb, producers, consumers, true, batch, debug)
total_time += duration
total_ops += ops
// Reset buffer after each run
rb.clear()
}
// Calculate performance metrics
avg_time := total_time / test_runs
throughput := f64(total_ops) / avg_time.seconds() / 1_000_000 // MOPS
latency := avg_time.nanoseconds() / f64(total_ops / test_runs) // ns/op
return RingBufferPerfResult{
scenario: scenario
throughput: throughput
latency: latency
cpu_usage: 0.0 // Actual value should be obtained from system
}
}
// Execute single test run
fn run_test(mut rb lockfree.RingBuffer[int], producers int, consumers int, measure bool, batch bool, debug bool) (time.Duration, int) {
mut wg := sync.new_waitgroup()
total_items := producers * items_per_thread
// Key fix: Consumers should consume exact producer total
items_per_consumer := total_items / consumers
mut remaining := total_items % consumers
// Start producers
start_time := time.now()
for i in 0 .. producers {
wg.add(1)
spawn producer_thread(mut rb, i, mut wg, batch, debug)
}
// Start consumers
mut consumed_counts := []int{len: consumers, init: 0}
for i in 0 .. consumers {
wg.add(1)
// Distribute remaining items to first consumers
mut count := items_per_consumer
if remaining > 0 {
count += 1
remaining -= 1
}
spawn consumer_thread(mut rb, i, count, mut consumed_counts, mut wg, batch, debug)
}
// Wait for all threads to complete
wg.wait()
end_time := time.now()
if debug {
println(rb.stat())
}
// Validate results
mut total_consumed := 0
for count in consumed_counts {
total_consumed += count
}
if total_consumed != total_items {
eprintln('Error: Produced ${total_items} items but consumed ${total_consumed}')
}
duration := end_time - start_time
if measure {
println('Completed ${total_items} items in ${duration.milliseconds()}ms')
}
return duration, total_items
}
// Producer thread implementation
fn producer_thread(mut rb lockfree.RingBuffer[int], id int, mut wg sync.WaitGroup, batch bool, debug bool) {
defer {
wg.done()
}
// Generate items in producer-specific range
start := id * items_per_thread
end := start + items_per_thread
if batch {
// Use batch pushing for better performance
batch_size := 32
mut batch_buffer := []int{cap: batch_size}
for i in start .. end {
batch_buffer << i
if batch_buffer.len == batch_size {
rb.push_many(batch_buffer)
batch_buffer.clear()
}
}
// Push remaining items in final batch
if batch_buffer.len > 0 {
rb.push_many(batch_buffer)
}
} else {
for i in start .. end {
rb.push(i)
}
}
}
// Consumer thread with fixed consumption target
fn consumer_thread(mut rb lockfree.RingBuffer[int], id int, items_to_consume int, mut consumed_counts []int, mut wg sync.WaitGroup, batch bool, debug bool) {
defer {
wg.done()
}
if batch {
// Use batch consumption for better performance
batch_size := 32
mut count := 0
mut last_value := -1
mut batch_buffer := []int{len: batch_size} // Reusable buffer
for count < items_to_consume - batch_size {
// Consume batch using pop_many
rb.pop_many(mut batch_buffer)
count += batch_size
// Debug output
if debug && count % 1000000 == 0 {
println('consume item count = ${count}')
}
// Check sequence continuity
validate_batch_detailed(id, batch_buffer, last_value)
last_value = batch_buffer[batch_buffer.len - 1]
}
remaining := items_to_consume - count
if remaining > 0 {
mut remaining_buffer := []int{len: remaining}
rb.pop_many(mut remaining_buffer)
count += remaining
validate_batch_detailed(id, remaining_buffer, last_value)
}
} else {
for i in 0 .. items_to_consume {
_ := rb.pop()
// Debug output
if debug && i % 1000000 == 0 {
println('consume item count = ${i}')
}
}
}
consumed_counts[id] = items_to_consume
}
fn validate_batch_detailed(id int, batch []int, prev_last int) bool {
if batch.len == 0 {
return true
}
mut valid := true
mut expected := batch[0] + 1
for i in 1 .. batch.len {
if batch[i] != expected {
eprintln('[Thread ${id}] Sequence error: Position ${i} expected ${expected}, got ${batch[i]}')
valid = false
}
expected += 1
}
return valid
}
// Print formatted performance results
fn print_results(results []RingBufferPerfResult) {
println('\nPerformance Results')
println('====================================================================')
println('Scenario\t\tThroughput (M ops/s)\tAvg Latency (ns)\tCPU Usage (%)')
println('--------------------------------------------------------------------')
for res in results {
println('${res.scenario:20}\t${res.throughput:8.2f}\t\t\t${res.latency:8.2f}\t\t\t${res.cpu_usage:5.1f}')
}
println('====================================================================')
// Find best performing scenario
mut best_throughput := 0.0
mut best_scenario := ''
for res in results {
if res.throughput > best_throughput {
best_throughput = res.throughput
best_scenario = res.scenario
}
}
println('\nBest performance: ${best_scenario} with ${best_throughput:.2f} M ops/s')
}

View file

@ -0,0 +1,63 @@
module lockfree
import sync.stdatomic
// Counter is a thread-safe atomic counter supporting multiple integer types.
// It provides atomic increment, decrement, and value retrieval operations.
@[noinit]
struct Counter[T] {
mut:
value &stdatomic.AtomicVal[T]
}
// new_counter creates a new atomic counter with the specified initial value.
// It only supports integer types (8-bit to 64-bit integers).
pub fn new_counter[T](init T) &Counter[T] {
// Compile-time type check: only integers are supported
$if T !is $int {
$compile_error('new_counter(): only integers are supported.')
}
return &Counter[T]{
value: stdatomic.new_atomic[T](init)
}
}
// increment_by atomically adds a delta value to the counter and returns
// the previous value before the operation (fetch-and-add).
@[inline]
pub fn (mut c Counter[T]) increment_by(delta T) T {
return c.value.add(delta)
}
// increment atomically increments the counter by 1 and returns
// the previous value before the operation.
@[inline]
pub fn (mut c Counter[T]) increment() T {
return c.increment_by(T(1))
}
// decrement_by atomically subtracts a delta value from the counter and returns
// the previous value before the operation (fetch-and-sub).
@[inline]
pub fn (mut c Counter[T]) decrement_by(delta T) T {
return c.value.sub(delta)
}
// decrement atomically decrements the counter by 1 and returns
// the previous value before the operation.
@[inline]
pub fn (mut c Counter[T]) decrement() T {
return c.decrement_by(T(1))
}
// get atomically retrieves the current value of the counter.
@[inline]
pub fn (mut c Counter[T]) get() T {
return c.value.load()
}
// clear atomically resets the counter to zero.
@[inline]
pub fn (mut c Counter[T]) clear() {
c.value.store(0)
}

View file

@ -0,0 +1,42 @@
import sync
import datatypes.lockfree
fn test_counter() {
number_threads := 10
mut counter := lockfree.new_counter(u64(0))
mut wg := sync.new_waitgroup()
for i in 0 .. number_threads {
wg.add(1)
spawn fn (mut c lockfree.Counter[u64], id int, mut wg sync.WaitGroup) {
for j in 0 .. 1000 {
c.increment()
}
wg.done()
}(mut counter, i, mut wg)
}
for i in 0 .. number_threads {
wg.add(1)
spawn fn (mut c lockfree.Counter[u64], id int, mut wg sync.WaitGroup) {
for j in 0 .. 1000 {
c.decrement()
}
wg.done()
}(mut counter, i, mut wg)
}
wg.wait()
assert counter.get() == u64(0)
counter.increment_by(100)
assert counter.get() == u64(100)
counter.decrement_by(100)
assert counter.get() == u64(0)
counter.increment_by(1024)
counter.clear()
assert counter.get() == u64(0)
mut counter_init := lockfree.new_counter(u64(100))
assert counter_init.get() == u64(100)
}

View file

@ -0,0 +1,23 @@
module lockfree
import sync.stdatomic as _
// Define cache line size to prevent false sharing between CPU cores
const cache_line_size = 64
// next_power_of_two calculates the smallest power of two >= n
@[inline]
fn next_power_of_two(n u32) u32 {
if n == 0 {
return 1
}
mut x := n - 1
// Efficient bit manipulation to find next power of two
x |= x >> 1
x |= x >> 2
x |= x >> 4
x |= x >> 8
x |= x >> 16
return x + 1
}

View file

@ -0,0 +1,593 @@
module lockfree
// This design is ported from the DPDK rte_ring library.
// Source: https://doc.dpdk.org/guides/prog_guide/ring_lib.html
// RingBufferMode Operation modes for the ring buffer.
pub enum RingBufferMode {
spsc = 0 // Single Producer, Single Consumer (optimized for single-threaded access)
spmc = 1 // Single Producer, Multiple Consumers (one writer, multiple readers)
mpsc = 2 // Multiple Producers, Single Consumer (multiple writers, one reader)
mpmc = 3 // Multiple Producers, Multiple Consumers (default, fully concurrent)
}
// RingBufferStat holds performance counters for ring buffer operations.
pub struct RingBufferStat {
pub mut:
push_full_count u32 // Times producers encountered full buffer
push_fail_count u32 // Times producers failed to reserve space
push_wait_prev_count u32 // Times producers waited for predecessors
push_waiting_count u32 // Current number of producers in waiting state
pop_empty_count u32 // Times consumers found empty buffer
pop_fail_count u32 // Times consumers failed to reserve items
pop_wait_prev_count u32 // Times consumers waited for predecessors
pop_waiting_count u32 // Current number of consumers in waiting state
}
// RingBufferParam Configuration parameters for ring buffer creation.
// - max_waiting_prod_cons: Setting this to a larger value may improve performance,
// but in scenarios with many producers/consumers, it could lead to severe contention issues.
@[params]
pub struct RingBufferParam {
pub:
mode RingBufferMode = .mpmc // Default to most concurrent mode
max_waiting_prod_cons int = 1 // Max allowed waiting producers/consumers before rejecting operations
}
// RingBuffer Lock-free multiple producer/multiple consumer ring buffer.
// Requires explicit initialization
@[noinit]
pub struct RingBuffer[T] {
mut:
mode u32 // Current operation mode (from RingBufferMode)
capacity u32 // Total capacity (always power of two)
mask u32 // Bitmask for index calculation (capacity - 1)
clear_flag u32 // Flag indicating clear operation in progress
max_waiting_prod_cons u32 // Max allowed waiting producers/consumers
pad0 [cache_line_size - 20]u8 // Padding to align to cache line boundary
// Producer state (isolated to prevent false sharing)
prod_head u32 // Producer head (next write position)
pad1 [cache_line_size - 4]u8 // Cache line padding
prod_tail u32 // Producer tail (last committed position)
pad2 [cache_line_size - 4]u8 // Cache line padding
// Consumer state (isolated to prevent false sharing)
cons_head u32 // Consumer head (next read position)
pad3 [cache_line_size - 4]u8 // Cache line padding
cons_tail u32 // Consumer tail (last committed position)
pad4 [cache_line_size - 4]u8 // Cache line padding
// Data storage area
slots []T // Array holding actual data elements
// Performance counters
push_full_count u32 // Count of full buffer encounters
push_fail_count u32 // Count of failed push attempts
push_wait_prev_count u32 // Count of waits for previous producers
push_waiting_count u32 // Current number of waiting producers
pop_empty_count u32 // Count of empty buffer encounters
pop_fail_count u32 // Count of failed pop attempts
pop_wait_prev_count u32 // Count of waits for previous consumers
pop_waiting_count u32 // Current number of waiting consumers
}
// new_ringbuffer creates a new lock-free ring buffer.
// Note: The buffer capacity will be expanded to the next power of two
// for efficient modulo operations using bitwise AND.
// The actual capacity may be larger than the requested `size`.
pub fn new_ringbuffer[T](size u32, param RingBufferParam) &RingBuffer[T] {
// Ensure capacity is power of two for efficient modulo operations
capacity := next_power_of_two(size)
mask := capacity - 1
// Initialize data storage array
mut slots := []T{len: int(capacity)}
rb := &RingBuffer[T]{
mode: u32(param.mode)
max_waiting_prod_cons: u32(param.max_waiting_prod_cons)
capacity: capacity
mask: mask
slots: slots
}
// Disable Valgrind checking for performance
$if valgrind ? {
C.VALGRIND_HG_DISABLE_CHECKING(rb, sizeof(RingBuffer[T]))
}
return rb
}
// is_multiple_producer checks if current mode is multiple producer.
@[inline]
fn is_multiple_producer(mode u32) bool {
return mode & 0x02 != 0
}
// is_multiple_consumer checks if current mode is multiple consumer.
@[inline]
fn is_multiple_consumer(mode u32) bool {
return mode & 0x01 != 0
}
// try_push tries to push a single item non-blocking.
@[inline]
pub fn (mut rb RingBuffer[T]) try_push(item T) bool {
return rb.try_push_many([item]) == 1
}
// try_push_many tries to push multiple items non-blocking.
@[direct_array_access]
pub fn (mut rb RingBuffer[T]) try_push_many(items []T) u32 {
n := u32(items.len)
if n == 0 {
return 0
}
// Check if clear operation is in progress or too many producers are waiting
if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_producer(rb.mode)
&& C.atomic_load_u32(&rb.push_waiting_count) > rb.max_waiting_prod_cons) {
return 0
}
capacity := rb.capacity
mut success := false
mut attempts := 0
mut old_head := u32(0)
mut new_head := u32(0)
// Attempt to reserve space in the buffer
for !success && attempts < 10 {
old_head = C.atomic_load_u32(&rb.prod_head)
// Memory barrier for weak memory models
$if !x64 && !x32 {
C.atomic_thread_fence(C.memory_order_acquire)
}
// Calculate available space using unsigned arithmetic
free_entries := capacity + C.atomic_load_u32(&rb.cons_tail) - old_head
// Check if there's enough space
if n > free_entries {
$if debug_ringbuffer ? {
C.atomic_fetch_add_u32(&rb.push_full_count, 1)
}
return 0
}
// Calculate new head position after adding items
new_head = old_head + n
if is_multiple_producer(rb.mode) {
// Atomic compare-and-swap for multiple producers
$if valgrind ? {
C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_head)
}
success = C.atomic_compare_exchange_weak_u32(&rb.prod_head, &old_head, new_head)
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.prod_head)
}
} else {
// Direct update for single producer
rb.prod_head = new_head
success = true
}
attempts++
}
// Exit if space reservation failed
if !success {
$if debug_ringbuffer ? {
C.atomic_fetch_add_u32(&rb.push_fail_count, 1)
}
return 0
}
// Write data to the reserved slots
for i in 0 .. n {
index := (old_head + i) & rb.mask
$if valgrind ? {
C.VALGRIND_HG_DISABLE_CHECKING(&rb.slots[index], sizeof(T))
C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index])
}
rb.slots[index] = items[i]
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index])
}
}
mut add_once := true
mut backoff := 1
if is_multiple_producer(rb.mode) {
// Increment waiting producer count
C.atomic_fetch_add_u32(&rb.push_waiting_count, 1)
mut attempts_wait := 1
// Wait for previous producers to complete their writes
for C.atomic_load_u32(&rb.prod_tail) != old_head {
// Exponential backoff to reduce CPU contention
for _ in 0 .. backoff {
C.cpu_relax() // Low-latency pause instruction
}
backoff = int_min(backoff * 2, 1024)
attempts_wait++
$if debug_ringbuffer ? {
if attempts_wait > 100 && add_once {
C.atomic_fetch_add_u32(&rb.push_wait_prev_count, 1)
add_once = false
}
}
}
// Decrement waiting producer count
C.atomic_fetch_sub_u32(&rb.push_waiting_count, 1)
}
// Make data visible to consumers
$if valgrind ? {
C.ANNOTATE_HAPPENS_BEFORE(&rb.prod_tail)
}
C.atomic_store_u32(&rb.prod_tail, new_head)
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.prod_tail)
}
return n
}
// try_pop tries to pop a single item non-blocking.
@[inline]
pub fn (mut rb RingBuffer[T]) try_pop() ?T {
mut items := []T{len: 1}
if rb.try_pop_many(mut items) == 1 {
return items[0]
}
return none // Buffer empty
}
// try_pop_many tries to pop multiple items non-blocking.
@[direct_array_access]
pub fn (mut rb RingBuffer[T]) try_pop_many(mut items []T) u32 {
n := u32(items.len)
if n == 0 {
return 0
}
// Check if clear operation is in progress or too many consumers are waiting
if C.atomic_load_u32(&rb.clear_flag) != 0 || (is_multiple_consumer(rb.mode)
&& C.atomic_load_u32(&rb.pop_waiting_count) > rb.max_waiting_prod_cons) {
return 0
}
mut success := false
mut attempts := 0
mut old_head := u32(0)
mut new_head := u32(0)
// Attempt to reserve data for reading
for !success && attempts < 10 {
old_head = C.atomic_load_u32(&rb.cons_head)
// Memory barrier for weak memory models
$if !x64 && !x32 {
C.atomic_thread_fence(C.memory_order_acquire)
}
// Calculate available items to read
entries := C.atomic_load_u32(&rb.prod_tail) - old_head
// Check if enough data is available
if n > entries {
$if debug_ringbuffer ? {
C.atomic_fetch_add_u32(&rb.pop_empty_count, 1)
}
return 0
}
// Calculate new head position after reading
new_head = old_head + n
if is_multiple_consumer(rb.mode) {
// Atomic compare-and-swap for multiple consumers
$if valgrind ? {
C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_head)
}
success = C.atomic_compare_exchange_weak_u32(&rb.cons_head, &old_head, new_head)
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.cons_head)
}
} else {
// Direct update for single consumer
rb.cons_head = new_head
success = true
}
attempts++
}
// Exit if data reservation failed
if !success {
C.atomic_fetch_add_u32(&rb.pop_fail_count, 1)
return 0
}
// Read data from reserved slots
for i in 0 .. n {
index := (old_head + i) & rb.mask
$if valgrind ? {
C.ANNOTATE_HAPPENS_BEFORE(&rb.slots[index])
}
items[i] = rb.slots[index]
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.slots[index])
}
}
mut add_once := true
mut backoff := 1
// For multiple consumers: wait for previous consumers to complete
if is_multiple_consumer(rb.mode) {
// Increment waiting consumer count
C.atomic_fetch_add_u32(&rb.pop_waiting_count, 1)
mut attempts_wait := 1
// Wait for previous consumers to complete their reads
for C.atomic_load_u32(&rb.cons_tail) != old_head {
// Exponential backoff to reduce CPU contention
for _ in 0 .. backoff {
C.cpu_relax() // Low-latency pause instruction
}
backoff = int_min(backoff * 2, 1024)
attempts_wait++
$if debug_ringbuffer ? {
if attempts_wait > 100 && add_once {
C.atomic_fetch_add_u32(&rb.pop_wait_prev_count, 1)
add_once = false
}
}
}
// Decrement waiting consumer count
C.atomic_fetch_sub_u32(&rb.pop_waiting_count, 1)
}
// Free up buffer space
$if valgrind ? {
C.ANNOTATE_HAPPENS_BEFORE(&rb.cons_tail)
}
C.atomic_store_u32(&rb.cons_tail, new_head)
$if valgrind ? {
C.ANNOTATE_HAPPENS_AFTER(&rb.cons_tail)
}
return n
}
// push blocking push of a single item.
@[inline]
pub fn (mut rb RingBuffer[T]) push(item T) {
mut backoff := 1
// Retry until successful
for {
if rb.try_push(item) {
return
}
// Exponential backoff to reduce contention
for _ in 0 .. backoff {
C.cpu_relax() // Pause before retry
}
backoff = int_min(backoff * 2, 1024)
}
}
// pop blocking pop of a single item.
@[inline]
pub fn (mut rb RingBuffer[T]) pop() T {
mut backoff := 1
// Retry until successful
for {
if item := rb.try_pop() {
return item
}
// Exponential backoff to reduce contention
for _ in 0 .. backoff {
C.cpu_relax() // Pause before retry
}
backoff = int_min(backoff * 2, 1024)
}
return T(0) // Default value (should never be reached)
}
// push_many blocking push of multiple items.
@[inline]
pub fn (mut rb RingBuffer[T]) push_many(items []T) {
mut backoff := 1
for {
n := rb.try_push_many(items)
if n == items.len {
break
} else {
// Exponential backoff when buffer is full
for _ in 0 .. backoff {
C.cpu_relax() // Pause when buffer is full
}
backoff = int_min(backoff * 2, 1024)
}
}
}
// pop_many blocking pop of multiple items.
@[inline]
pub fn (mut rb RingBuffer[T]) pop_many(mut result []T) {
n := result.len
if n == 0 {
return
}
mut backoff := 1
for {
ret := rb.try_pop_many(mut result)
if ret == n {
break
} else {
// Exponential backoff when buffer is empty
for _ in 0 .. backoff {
C.cpu_relax() // Pause when buffer is empty
}
backoff = int_min(backoff * 2, 1024)
}
}
}
// is_empty checks if the buffer is empty.
@[inline]
pub fn (rb RingBuffer[T]) is_empty() bool {
return rb.occupied() == 0
}
// is_full checks if the buffer is full.
@[inline]
pub fn (rb RingBuffer[T]) is_full() bool {
return rb.occupied() >= rb.capacity
}
// capacity returns the total capacity of the buffer.
@[inline]
pub fn (rb RingBuffer[T]) capacity() u32 {
return rb.capacity
}
// occupied returns the number of occupied slots.
@[inline]
pub fn (rb RingBuffer[T]) occupied() u32 {
// Memory barrier for weak memory models
$if !x64 && !x32 {
C.atomic_thread_fence(C.memory_order_acquire)
}
prod_tail := C.atomic_load_u32(&rb.prod_tail)
cons_tail := C.atomic_load_u32(&rb.cons_tail)
// Handle potential overflow
used := if prod_tail >= cons_tail {
prod_tail - cons_tail
} else {
(max_u32 - cons_tail) + prod_tail + 1
}
return used
}
// remaining returns the number of free slots.
@[inline]
pub fn (rb RingBuffer[T]) remaining() u32 {
return rb.capacity - rb.occupied()
}
// clear clears the ring buffer and resets all pointers.
pub fn (mut rb RingBuffer[T]) clear() bool {
mut clear_flag := u32(0)
mut attempts := 0
max_attempts := 1000
// Acquire clear flag using atomic CAS
for {
if C.atomic_compare_exchange_weak_u32(&rb.clear_flag, &clear_flag, 1) {
break
}
clear_flag = u32(0)
C.cpu_relax()
attempts++
if attempts > max_attempts {
return false // Failed to acquire clear flag
}
}
// Wait for producers to finish with exponential backoff
mut backoff := 1
mut prod_wait := 0
for {
prod_head := C.atomic_load_u32(&rb.prod_head)
prod_tail := C.atomic_load_u32(&rb.prod_tail)
if prod_head == prod_tail {
break
}
// Exponential backoff wait
for _ in 0 .. backoff {
C.cpu_relax()
}
backoff = int_min(backoff * 2, 1024)
prod_wait++
if prod_wait > max_attempts {
// Force advance producer tail
C.atomic_store_u32(&rb.prod_tail, prod_head)
break
}
}
// Wait for consumers to finish with exponential backoff
backoff = 1
mut cons_wait := 0
for {
cons_head := C.atomic_load_u32(&rb.cons_head)
cons_tail := C.atomic_load_u32(&rb.cons_tail)
if cons_head == cons_tail {
break
}
// Exponential backoff wait
for _ in 0 .. backoff {
C.cpu_relax()
}
backoff = int_min(backoff * 2, 1024)
cons_wait++
if cons_wait > max_attempts {
// Force advance consumer tail
C.atomic_store_u32(&rb.cons_tail, cons_head)
break
}
}
// Reset all pointers to zero
C.atomic_store_u32(&rb.prod_head, 0)
C.atomic_store_u32(&rb.prod_tail, 0)
C.atomic_store_u32(&rb.cons_head, 0)
C.atomic_store_u32(&rb.cons_tail, 0)
C.atomic_store_u32(&rb.push_full_count, 0)
C.atomic_store_u32(&rb.push_fail_count, 0)
C.atomic_store_u32(&rb.push_wait_prev_count, 0)
C.atomic_store_u32(&rb.push_waiting_count, 0)
C.atomic_store_u32(&rb.pop_empty_count, 0)
C.atomic_store_u32(&rb.pop_fail_count, 0)
C.atomic_store_u32(&rb.pop_wait_prev_count, 0)
C.atomic_store_u32(&rb.pop_waiting_count, 0)
// Release clear flag
C.atomic_store_u32(&rb.clear_flag, 0)
return true // Clear operation successful
}
// stat retrieves current performance statistics of the ring buffer.
//
// This method fetches all recorded operation counters:
// - push_full_count: Times producers encountered full buffer
// - push_fail_count: Times producers failed to reserve space
// - push_wait_prev_count: Times producers waited for predecessors
// - push_waiting_count: Current number of producers in waiting state
// - pop_empty_count: Times consumers found empty buffer
// - pop_fail_count: Times consumers failed to reserve items
// - pop_wait_prev_count: Times consumers waited for predecessors
// - pop_waiting_count: Current number of consumers in waiting state
pub fn (rb RingBuffer[T]) stat() RingBufferStat {
$if debug_ringbuffer ? {
return RingBufferStat{
push_full_count: C.atomic_load_u32(&rb.push_full_count)
push_fail_count: C.atomic_load_u32(&rb.push_fail_count)
push_wait_prev_count: C.atomic_load_u32(&rb.push_wait_prev_count)
push_waiting_count: C.atomic_load_u32(&rb.push_waiting_count)
pop_empty_count: C.atomic_load_u32(&rb.pop_empty_count)
pop_fail_count: C.atomic_load_u32(&rb.pop_fail_count)
pop_wait_prev_count: C.atomic_load_u32(&rb.pop_wait_prev_count)
pop_waiting_count: C.atomic_load_u32(&rb.pop_waiting_count)
}
}
return RingBufferStat{}
}

View file

@ -0,0 +1,373 @@
import datatypes.lockfree
import time
import sync
// Test basic push and pop operations
fn test_push_and_pop() {
mut r := lockfree.new_ringbuffer[int](2)
r.push(3)
r.push(4)
mut oldest_value := r.pop()
assert oldest_value == 3
r.push(5)
oldest_value = r.pop()
assert oldest_value == 4
}
// Test clear functionality and empty state
fn test_clear_and_empty() {
mut r := lockfree.new_ringbuffer[int](4)
r.push(3)
r.push(4)
oldest_value := r.pop()
assert oldest_value == 3
r.clear()
assert r.is_empty() == true
}
// Test capacity tracking and full state detection
fn test_capacity_and_is_full() {
mut r := lockfree.new_ringbuffer[int](4)
assert r.capacity() == 4
r.push(3)
r.push(4)
r.push(5)
r.push(6)
assert r.is_full() == true
}
// Test occupied slots vs remaining capacity
fn test_occupied_and_remaining() {
mut r := lockfree.new_ringbuffer[int](4)
r.push(3)
r.push(4)
assert r.occupied() == r.remaining()
}
// Test batch push/pop operations
fn test_push_and_pop_many() {
mut r := lockfree.new_ringbuffer[int](4)
a := [1, 2, 3, 4]
r.push_many(a)
assert r.is_full() == true
mut b := []int{len: 4}
r.pop_many(mut b)
assert a == b
}
// Test single producer single consumer mode
fn test_spsc_mode() {
println('===== Testing SPSC Mode =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .spsc)
// Basic push/pop functionality
assert rb.try_push(42) == true
assert rb.try_push(100) == true
assert rb.occupied() == 2
item1 := rb.try_pop() or { panic('Expected value') }
assert item1 == 42
item2 := rb.try_pop() or { panic('Expected value') }
assert item2 == 100
assert rb.is_empty() == true
// Boundary capacity testing
for i in 0 .. 1024 {
assert rb.try_push(i) == true
}
assert rb.is_full() == true
assert rb.try_push(1024) == false
for i in 0 .. 1024 {
item := rb.try_pop() or { panic('Expected value') }
assert item == i
}
assert rb.is_empty() == true
println('SPSC basic tests passed')
// Performance measurement
start := time.now()
mut producer := spawn fn (mut rb lockfree.RingBuffer[int]) {
for i in 0 .. 100000 {
rb.push(i)
}
}(mut rb)
mut consumer := spawn fn (mut rb lockfree.RingBuffer[int]) {
for i in 0 .. 100000 {
item := rb.pop()
assert item == i
}
}(mut rb)
producer.wait()
consumer.wait()
duration := time.since(start)
println('SPSC performance: ${duration} for 100k items')
}
// Test single producer multiple consumers mode
fn test_spmc_mode() {
println('===== Testing SPMC Mode =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .spmc)
mut wg := sync.new_waitgroup()
consumers := 4
items_per_consumer := 25000
total_items := consumers * items_per_consumer
// Producer thread
spawn fn (mut rb lockfree.RingBuffer[int], total int) {
for i in 0 .. total {
rb.push(i)
}
}(mut rb, total_items)
// Consumer threads
shared results := []int{cap: total_items}
for i in 0 .. consumers {
wg.add(1)
spawn fn (id int, mut rb lockfree.RingBuffer[int], shared results []int, count int, mut wg sync.WaitGroup) {
for _ in 0 .. count {
item := rb.pop()
lock results {
results << item
}
}
wg.done()
}(i, mut rb, shared results, items_per_consumer, mut wg)
}
wg.wait()
// Result validation
lock results {
assert results.len == total_items
results.sort()
for i in 0 .. total_items {
assert results[i] == i
}
}
println('SPMC test passed with ${consumers} consumers')
}
// Test multiple producers single consumer mode
fn test_mpsc_mode() {
println('===== Testing MPSC Mode =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpsc)
mut wg := sync.new_waitgroup()
producers := 4
items_per_producer := 25000
total_items := producers * items_per_producer
// Consumer thread
wg.add(1)
shared results := []int{cap: total_items}
spawn fn (mut rb lockfree.RingBuffer[int], shared results []int, total int, mut wg sync.WaitGroup) {
for _ in 0 .. total {
item := rb.pop()
lock results {
results << item
}
}
wg.done()
}(mut rb, shared results, total_items, mut wg)
// Producer threads
for i in 0 .. producers {
wg.add(1)
spawn fn (mut rb lockfree.RingBuffer[int], start int, count int, mut wg sync.WaitGroup) {
for j in 0 .. count {
rb.push(start + j)
}
wg.done()
}(mut rb, i * items_per_producer, items_per_producer, mut wg)
}
wg.wait()
// Result validation
lock results {
assert results.len == total_items
results.sort()
for i in 0 .. total_items {
assert results[i] == i
}
}
println('MPSC test passed with ${producers} producers')
}
// Test multiple producers multiple consumers mode
fn test_mpmc_mode() {
println('===== Testing MPMC Mode =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc)
mut wg := sync.new_waitgroup()
producers := 4
consumers := 4
items_per_producer := 10000
total_items := producers * items_per_producer
// Result collection
shared results := []int{cap: total_items}
// Producer threads
for i in 0 .. producers {
wg.add(1)
spawn fn (mut rb lockfree.RingBuffer[int], start int, count int, mut wg sync.WaitGroup) {
for j in 0 .. count {
rb.push(start + j)
}
wg.done()
}(mut rb, i * items_per_producer, items_per_producer, mut wg)
}
// Consumer threads
for i in 0 .. consumers {
wg.add(1)
spawn fn (mut rb lockfree.RingBuffer[int], shared results []int, count int, mut wg sync.WaitGroup) {
for _ in 0 .. count {
item := rb.pop()
lock results {
results << item
}
}
wg.done()
}(mut rb, shared results, items_per_producer, mut wg)
}
wg.wait()
// Result validation
lock results {
assert results.len == total_items
results.sort()
for i in 0 .. total_items {
assert results[i] == i
}
}
println('MPMC test passed with ${producers} producers and ${consumers} consumers')
}
// Test buffer clear functionality
fn test_clear_function() {
println('===== Testing Clear Function =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc)
// Fill buffer partially
for i in 0 .. 512 {
rb.push(i)
}
assert rb.occupied() == 512
// Clear buffer verification
assert rb.clear() == true
assert rb.is_empty() == true
assert rb.try_pop() == none
// Concurrent clear test
mut wg := sync.new_waitgroup()
producers := 4
items_per_producer := 1000
// Producer threads
for i in 0 .. producers {
wg.add(1)
spawn fn (mut rb lockfree.RingBuffer[int], id int, count int, mut wg sync.WaitGroup) {
for j in 0 .. count {
rb.push(id * 1000 + j)
}
wg.done()
}(mut rb, i, items_per_producer, mut wg)
}
// Clear thread
spawn fn (mut rb lockfree.RingBuffer[int]) {
time.sleep(1 * time.millisecond) // Allow producers to start
for i in 0 .. 5 {
if rb.clear() {
println('Clear successful ${i}')
time.sleep(2 * time.millisecond)
} else {
println('Clear failed ${i}')
}
}
}(mut rb)
wg.wait()
println('Clear function test passed')
}
// Test edge case scenarios
fn test_edge_cases() {
println('===== Testing Edge Cases =====')
mut rb := lockfree.new_ringbuffer[int](4, mode: .spsc)
// Empty buffer tests
assert rb.is_empty() == true
assert rb.try_pop() == none
assert rb.remaining() == 4
// Full buffer tests
assert rb.try_push(1) == true
assert rb.try_push(2) == true
assert rb.try_push(3) == true
assert rb.try_push(4) == true
assert rb.is_full() == true
assert rb.try_push(5) == false
assert rb.remaining() == 0
// Pop then push again
item := rb.try_pop() or { panic('Expected value') }
assert item == 1
assert rb.try_push(5) == true
assert rb.is_full() == true
// Clear and reuse
assert rb.clear() == true
assert rb.is_empty() == true
assert rb.try_push(10) == true
assert rb.try_pop() or { panic('Expected value') } == 10
println('Edge cases test passed')
}
// Test batch operations functionality
fn test_batch_operations() {
println('===== Testing Batch Operations =====')
mut rb := lockfree.new_ringbuffer[int](1024, mode: .mpmc)
// Batch push operation
items := []int{len: 100, init: index}
pushed := rb.try_push_many(items)
assert pushed == 100
assert rb.occupied() == 100
// Batch pop operation
mut result := []int{len: 100}
popped := rb.try_pop_many(mut result)
assert popped == 100
for i in 0 .. 100 {
assert result[i] == i
}
assert rb.is_empty() == true
println('Batch operations test passed')
}