mirror of
https://github.com/vlang/v.git
synced 2025-09-13 14:32:26 +03:00
examples: fix some of the instructions in examples/thread_safety/
(#22571)
This commit is contained in:
parent
dd0b6a6480
commit
7a0febb12d
3 changed files with 81 additions and 45 deletions
|
@ -20,15 +20,7 @@ Key points:
|
||||||
By using atomic operations and proper thread synchronization, the code ensures that the shared counter is
|
By using atomic operations and proper thread synchronization, the code ensures that the shared counter is
|
||||||
incremented safely and correctly by multiple threads.
|
incremented safely and correctly by multiple threads.
|
||||||
*/
|
*/
|
||||||
$if windows {
|
import sync as _
|
||||||
#include "@VEXEROOT/thirdparty/stdatomic/win/atomic.h"
|
|
||||||
} $else {
|
|
||||||
#include "@VEXEROOT/thirdparty/stdatomic/nix/atomic.h"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Declare the atomic functions
|
|
||||||
fn C.atomic_fetch_add_u32(&u32, u32) u32
|
|
||||||
fn C.atomic_load_u32(&u32) u32
|
|
||||||
|
|
||||||
// Function to increment the atomic counter
|
// Function to increment the atomic counter
|
||||||
fn increment(atomic_counter &u32) {
|
fn increment(atomic_counter &u32) {
|
||||||
|
|
|
@ -1,49 +1,94 @@
|
||||||
/*
|
/*
|
||||||
This example demonstrates thread safety using channels in V.
|
This example demonstrates thread safety using a queue of callbacks.
|
||||||
|
|
||||||
### Functions:
|
### Functions:
|
||||||
- `producer(ch chan int)`: This function simulates a producer that sends integers from 1 to 99 to
|
- `producer`: creates a callback and adds it to the queue.
|
||||||
the channel `ch`. It prints each produced item.
|
- `consumer`: consumes a callback from the queue and runs it.
|
||||||
- `consumer(ch chan int)`: This function simulates a consumer that receives integers from the
|
- `heavy_processing`: a heavy processing function that is added to the queue.
|
||||||
channel `ch`.
|
|
||||||
|
|
||||||
### Thread Safety:
|
### Thread Safety:
|
||||||
- The use of channels ensures thread safety by providing a synchronized way to communicate between
|
- The `fn producer` function is protected by a mutex. It locks the mutex before adding a callback
|
||||||
the producer and consumer threads.
|
to the queue and unlocks it after adding the callback.
|
||||||
- Channels in V are designed to handle concurrent access, preventing race conditions and ensuring
|
- The `fn consumer` function is also protected by the same mutex. It locks the mutex before
|
||||||
that data is safely passed between threads.
|
consuming a callback from the queue and unlocks it after consuming the callback.
|
||||||
- The `select` statement in the consumer function allows it to handle timeouts gracefully,
|
- The `heavy_processing` function is added to the queue by the main thread before the producer
|
||||||
ensuring that the program does not hang if the producer is not ready.
|
threads start producing callbacks. The main thread is the only thread that adds this function to
|
||||||
|
the queue, so it doesn't need to be protected by a mutex.
|
||||||
*/
|
*/
|
||||||
import time
|
import time
|
||||||
|
import sync
|
||||||
|
|
||||||
fn producer(ch chan int) {
|
type Callback = fn (id string)
|
||||||
for i in 1 .. 100 {
|
|
||||||
ch <- i
|
fn producer(producer_name string, mut arr []Callback, mut mtx sync.Mutex) {
|
||||||
|
for i in 1 .. 5 {
|
||||||
|
mtx.lock()
|
||||||
|
arr << fn [producer_name, i] (consumer_name string) {
|
||||||
|
println('task ${i} created by producer ${producer_name}: consumed by ${consumer_name}')
|
||||||
|
time.sleep(500 * time.millisecond)
|
||||||
|
}
|
||||||
println('Produced: ${i}')
|
println('Produced: ${i}')
|
||||||
|
time.sleep(50 * time.millisecond)
|
||||||
|
mtx.unlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn consumer(ch chan int) {
|
fn consumer(consumer_name string, mut arr []Callback, mut mtx sync.Mutex) {
|
||||||
for {
|
for {
|
||||||
select {
|
mtx.lock()
|
||||||
item := <-ch {
|
if arr.len > 0 {
|
||||||
println('Consumed: ${item}')
|
callback := arr[0]
|
||||||
}
|
arr.delete(0)
|
||||||
500 * time.millisecond {
|
|
||||||
println('Timeout: No producers were ready within 0.5s')
|
mtx.unlock()
|
||||||
break
|
callback(consumer_name) // run after unlocking to allow other threads to consume
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
println('- No items to consume')
|
||||||
|
mtx.unlock()
|
||||||
|
|
||||||
|
// time.sleep(500 * time.millisecond)
|
||||||
|
// continue // uncomment to run forever
|
||||||
|
|
||||||
|
break // uncomment to stop after consuming all items
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn heavy_processing(queue_id string) {
|
||||||
|
println('One more: ${queue_id}')
|
||||||
|
time.sleep(500 * time.millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
ch := chan int{cap: 10}
|
mut mtx := sync.new_mutex()
|
||||||
|
mut arr := []Callback{}
|
||||||
|
|
||||||
producer_thread := spawn producer(ch)
|
producer_threads := [
|
||||||
consumer_thread := spawn consumer(ch)
|
spawn producer('Paula', mut &arr, mut mtx),
|
||||||
|
spawn producer('Adriano', mut &arr, mut mtx),
|
||||||
|
spawn producer('Kaka', mut &arr, mut mtx),
|
||||||
|
spawn producer('Hitalo', mut &arr, mut mtx),
|
||||||
|
spawn producer('Jonh', mut &arr, mut mtx),
|
||||||
|
]
|
||||||
|
|
||||||
producer_thread.wait()
|
mut consumer_threads := [
|
||||||
consumer_thread.wait()
|
spawn consumer('consumer number 0', mut &arr, mut mtx),
|
||||||
|
]
|
||||||
|
|
||||||
|
// spawn 16 consumers
|
||||||
|
for i in 1 .. 16 {
|
||||||
|
consumer_threads << spawn consumer('consumer number ${i}', mut &arr, mut mtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
mtx.lock()
|
||||||
|
arr << heavy_processing
|
||||||
|
mtx.unlock()
|
||||||
|
|
||||||
|
for t in producer_threads {
|
||||||
|
t.wait()
|
||||||
|
}
|
||||||
|
for t in consumer_threads {
|
||||||
|
t.wait()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,17 +1,16 @@
|
||||||
### Run
|
### Run
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
v -prod -autofree ./queue.v -o ./queue.c && \
|
v -prod -gc none -cc gcc ./queue.v && \
|
||||||
gcc ./queue.c -o ./queue.out && \
|
./queue
|
||||||
./queue.out
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Valgrind
|
### Valgrind
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
# Helgrind: a tool for detecting synchronisation errors in programs that use the POSIX pthreads threading primitives.
|
# Helgrind: a tool for detecting synchronisation errors in programs that use the POSIX pthreads threading primitives.
|
||||||
valgrind --tool=helgrind ./queue.out
|
valgrind --tool=helgrind ./queue
|
||||||
|
|
||||||
# DRD: a tool for detecting errors in multithreaded programs. The tool works for any program that uses the POSIX threading primitives or that uses threading concepts built on top of the POSIX threading primitives.
|
# DRD: a tool for detecting errors in multithreaded programs. The tool works for any program that uses the POSIX threading primitives or that uses threading concepts built on top of the POSIX threading primitives.
|
||||||
valgrind --tool=drd ./queue.out
|
valgrind --tool=drd ./queue
|
||||||
```
|
```
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue