mirror of
https://github.com/vlang/v.git
synced 2025-09-13 22:42:26 +03:00
vlib: add an arrays.parallel
module, containing parallel.run/3
and parallel.amap/3
implementations (#22090)
This commit is contained in:
parent
481b842fb6
commit
a31cd37b66
3 changed files with 179 additions and 0 deletions
|
@ -230,9 +230,11 @@ const skip_on_musl = [
|
||||||
'vlib/gg/draw_fns_api_test.v',
|
'vlib/gg/draw_fns_api_test.v',
|
||||||
'vlib/v/tests/skip_unused/gg_code.vv',
|
'vlib/v/tests/skip_unused/gg_code.vv',
|
||||||
'vlib/v/tests/c_struct_with_reserved_field_name_test.v',
|
'vlib/v/tests/c_struct_with_reserved_field_name_test.v',
|
||||||
|
'vlib/arrays/parallel/parallel_test.v',
|
||||||
]
|
]
|
||||||
const skip_on_ubuntu_musl = [
|
const skip_on_ubuntu_musl = [
|
||||||
'do_not_remove',
|
'do_not_remove',
|
||||||
|
'vlib/arrays/parallel/parallel_test.v',
|
||||||
//'vlib/v/gen/js/jsgen_test.v',
|
//'vlib/v/gen/js/jsgen_test.v',
|
||||||
'vlib/net/http/cookie_test.v',
|
'vlib/net/http/cookie_test.v',
|
||||||
'vlib/net/http/status_test.v',
|
'vlib/net/http/status_test.v',
|
||||||
|
|
111
vlib/arrays/parallel/parallel.v
Normal file
111
vlib/arrays/parallel/parallel.v
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
module parallel
|
||||||
|
|
||||||
|
import sync
|
||||||
|
import runtime
|
||||||
|
|
||||||
|
// Params contains the optional parameters that can be passed to `run` and `amap`.
|
||||||
|
@[params]
|
||||||
|
pub struct Params {
|
||||||
|
pub mut:
|
||||||
|
workers int // 0 by default, so that VJOBS will be used, through runtime.nr_jobs()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn limited_workers(max_workers int, ilen int) int {
|
||||||
|
// create a limited amount of workers to handle the load
|
||||||
|
workers := if max_workers != 0 { max_workers } else { runtime.nr_jobs() }
|
||||||
|
if ilen < workers {
|
||||||
|
return ilen
|
||||||
|
}
|
||||||
|
return workers
|
||||||
|
}
|
||||||
|
|
||||||
|
// run lets the user run an array of input with a
|
||||||
|
// user provided function in parallel. It limits the number of
|
||||||
|
// worker threads to min(num_workers, num_cpu)
|
||||||
|
// The function aborts if an error is encountered.
|
||||||
|
// Example: parallel.run([1, 2, 3, 4, 5], 2, fn (i) { println(i) })
|
||||||
|
pub fn run[T](input []T, worker fn (T), opt Params) {
|
||||||
|
if input.len == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
workers := limited_workers(opt.workers, input.len)
|
||||||
|
ch := chan T{cap: workers * 2}
|
||||||
|
mut wg := sync.new_waitgroup()
|
||||||
|
wg.add(input.len)
|
||||||
|
for _ in 0 .. workers {
|
||||||
|
spawn fn [ch, worker, mut wg] [T]() {
|
||||||
|
for {
|
||||||
|
task := <-ch or { break }
|
||||||
|
worker(task)
|
||||||
|
wg.done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// put the input into the channel
|
||||||
|
for i in input {
|
||||||
|
ch <- i
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for all tasks to complete
|
||||||
|
wg.wait()
|
||||||
|
ch.close() // this will signal all the workers to exit, and we can return, without having to wait for them to finish
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Task[T, R] {
|
||||||
|
idx int
|
||||||
|
input T
|
||||||
|
result R
|
||||||
|
}
|
||||||
|
|
||||||
|
// amap lets the user run an array of input with a
|
||||||
|
// user provided function in parallel. It limits the number of
|
||||||
|
// worker threads to max number of cpus.
|
||||||
|
// The worker function can return a value. The returning array maintains the input order.
|
||||||
|
// Any error handling should have happened within the worker function.
|
||||||
|
// Example: squares := parallel.amap([1, 2, 3, 4, 5], 2, fn (i) { return i * i })
|
||||||
|
pub fn amap[T, R](input []T, worker fn (T) R, opt Params) []R {
|
||||||
|
if input.len == 0 {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
mut tasks := []Task[T, R]{len: input.len}
|
||||||
|
// the tasks array will be passed to the closure of each worker by reference, so that it could
|
||||||
|
// then modify the same tasks:
|
||||||
|
mut tasks_ref := &tasks
|
||||||
|
|
||||||
|
workers := limited_workers(opt.workers, input.len)
|
||||||
|
// use a buffered channel for transfering the tasks, that has enough space to keep all the workers busy,
|
||||||
|
// without blocking the main thread needlessly
|
||||||
|
ch := chan Task[T, R]{cap: workers * 2}
|
||||||
|
mut wg := sync.new_waitgroup()
|
||||||
|
wg.add(input.len)
|
||||||
|
for _ in 0 .. workers {
|
||||||
|
spawn fn [ch, worker, mut wg, mut tasks_ref] [T, R]() {
|
||||||
|
for {
|
||||||
|
mut task := <-ch or { break }
|
||||||
|
unsafe {
|
||||||
|
tasks_ref[task.idx] = Task[T, R]{
|
||||||
|
idx: task.idx
|
||||||
|
input: task.input
|
||||||
|
result: worker(task.input)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// put the input into the channel
|
||||||
|
for idx, inp in input {
|
||||||
|
ch <- Task[T, R]{
|
||||||
|
idx: idx
|
||||||
|
input: inp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for all tasks to complete
|
||||||
|
wg.wait()
|
||||||
|
ch.close()
|
||||||
|
tasks.sort(a.idx < b.idx)
|
||||||
|
return tasks.map(it.result)
|
||||||
|
}
|
66
vlib/arrays/parallel/parallel_test.v
Normal file
66
vlib/arrays/parallel/parallel_test.v
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
import arrays.parallel
|
||||||
|
import rand
|
||||||
|
import time
|
||||||
|
|
||||||
|
fn test_parallel_run_with_empty_arrays() {
|
||||||
|
parallel.run([]int{}, fn (x int) {})
|
||||||
|
parallel.run([]u8{}, fn (x u8) {})
|
||||||
|
parallel.run([]u32{}, fn (x u32) {}, workers: 1000)
|
||||||
|
assert true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_parallel_amap_with_empty_arrays() {
|
||||||
|
assert parallel.amap([]int{}, fn (x int) u8 {
|
||||||
|
return 0
|
||||||
|
}) == []
|
||||||
|
assert parallel.amap([]u8{}, fn (x u8) int {
|
||||||
|
return 0
|
||||||
|
}) == []
|
||||||
|
assert parallel.amap([]u8{}, fn (x u8) int {
|
||||||
|
return 0
|
||||||
|
}, workers: 1000) == []
|
||||||
|
assert true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_parallel_run() {
|
||||||
|
counters := []int{len: 10, init: index}
|
||||||
|
dump(counters)
|
||||||
|
mut res := []string{len: 10}
|
||||||
|
mut pres := &res
|
||||||
|
parallel.run(counters, fn [mut pres] (i int) {
|
||||||
|
delay := rand.intn(250) or { 250 }
|
||||||
|
time.sleep(delay * time.millisecond)
|
||||||
|
unsafe {
|
||||||
|
pres[i] = 'task ${i}, delay=${delay}ms'
|
||||||
|
}
|
||||||
|
assert true
|
||||||
|
})
|
||||||
|
dump(res)
|
||||||
|
assert res.len == counters.len
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_parallel_amap() {
|
||||||
|
input := [1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||||
|
dump(input)
|
||||||
|
dump(input.len)
|
||||||
|
output := parallel.amap(input, fn (i int) int {
|
||||||
|
delay := rand.intn(250) or { 250 }
|
||||||
|
time.sleep(delay * time.millisecond)
|
||||||
|
return i * i
|
||||||
|
})
|
||||||
|
dump(output)
|
||||||
|
dump(output.len)
|
||||||
|
assert input.len == output.len
|
||||||
|
|
||||||
|
for i, _ in output {
|
||||||
|
assert output[i] == input[i] * input[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// unordered output validation
|
||||||
|
assert output.len == input.len
|
||||||
|
op_sorted := output.sorted()
|
||||||
|
dump(op_sorted)
|
||||||
|
for i, op in op_sorted {
|
||||||
|
assert op == input[i] * input[i]
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue