mirror of
https://github.com/vlang/v.git
synced 2025-09-13 14:32:26 +03:00
vlib: add a pool module (#24661)
This commit is contained in:
parent
131449e70d
commit
cab97894ae
3 changed files with 1182 additions and 0 deletions
148
vlib/pool/README.md
Normal file
148
vlib/pool/README.md
Normal file
|
@ -0,0 +1,148 @@
|
|||
# Connection Pool Module
|
||||
|
||||
This module provides a robust connection pooling implementation for
|
||||
managing reusable resources like database connections. It handles
|
||||
connection lifecycle, validation, and efficient resource allocation
|
||||
with minimal overhead.
|
||||
|
||||
## Features
|
||||
|
||||
- **Connection Reuse**: Efficiently manages reusable connections
|
||||
- **Health Validation**: Automatic connection validation
|
||||
- **Dynamic Scaling**: Adjusts pool size based on demand
|
||||
- **Intelligent Eviction**: Removes stale connections with priority-based cleanup
|
||||
- **Statistics Tracking**: Provides detailed pool metrics
|
||||
- **Thread Safety**: Fully concurrent-safe implementation
|
||||
- **Graceful Shutdown**: Clean resource termination
|
||||
- **Dynamic Configuration**: Runtime configuration updates
|
||||
|
||||
## Basic Usage
|
||||
|
||||
### Creating a Pool
|
||||
|
||||
```v ignore
|
||||
import db.mysql
|
||||
import pool
|
||||
import time
|
||||
|
||||
// Define your connection factory function
|
||||
fn create_conn() !&pool.ConnectionPoolable {
|
||||
config := mysql.Config{
|
||||
host: '127.0.0.1'
|
||||
port: 3306
|
||||
username: 'root'
|
||||
password: '12345678'
|
||||
dbname: 'mysql'
|
||||
}
|
||||
db := mysql.connect(config)!
|
||||
return &db
|
||||
}
|
||||
|
||||
// Configure pool parameters
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_conns: 50
|
||||
min_idle_conns: 5
|
||||
max_lifetime: 2 * time.hour
|
||||
idle_timeout: 30 * time.minute
|
||||
get_timeout: 5 * time.second
|
||||
}
|
||||
|
||||
// Create connection pool
|
||||
mut my_pool := pool.new_connection_pool(create_conn, config)!
|
||||
|
||||
// Acquire connection
|
||||
mut conn := my_pool.get()!
|
||||
|
||||
// Use connection
|
||||
// ... your operations ...
|
||||
|
||||
// Return connection to pool
|
||||
my_pool.put(conn)!
|
||||
|
||||
// When application exits
|
||||
my_pool.close()
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
| Parameter | Default Value | Description |
|
||||
|---------------------|-----------------------|--------------------------------------|
|
||||
| `max_conns` | 20 | Maximum connections in pool |
|
||||
| `min_idle_conns` | 5 | Minimum idle connections to maintain |
|
||||
| `max_lifetime` | 1 hour | Max connection lifetime |
|
||||
| `idle_timeout` | 30 minutes | Idle connection timeout |
|
||||
| `get_timeout` | 5 seconds | Connection acquisition timeout |
|
||||
| `retry_base_delay` | 1 second | Base delay for connection retries |
|
||||
| `max_retry_delay` | 30 seconds | Maximum retry delay |
|
||||
| `max_retry_attempts`| 5 | Maximum connection creation attempts |
|
||||
|
||||
## Advanced Features
|
||||
|
||||
### Dynamic Configuration Update
|
||||
|
||||
```v ignore
|
||||
new_config := pool.ConnectionPoolConfig{
|
||||
max_conns: 100
|
||||
min_idle_conns: 10
|
||||
// ... other parameters ...
|
||||
}
|
||||
|
||||
my_pool.update_config(new_config)!
|
||||
```
|
||||
|
||||
### Connection Recovery Signal
|
||||
|
||||
```v ignore
|
||||
// After connection maintenance/outage
|
||||
my_pool.signal_recovery_event()
|
||||
```
|
||||
|
||||
### Statistics Monitoring
|
||||
|
||||
```v ignore
|
||||
stats := my_pool.stats()
|
||||
println("Active connections: ${stats.active_conns}")
|
||||
println("Idle connections: ${stats.idle_conns}")
|
||||
println("Waiting clients: ${stats.waiting_clients}")
|
||||
```
|
||||
|
||||
## Implementation Notes
|
||||
|
||||
1. **Exponential Backoff**: Connection creation uses exponential backoff with jitter
|
||||
2. **Priority Eviction**: Four priority levels for connection cleanup:
|
||||
- `low`: Routine maintenance
|
||||
- `medium`: Connection acquisition failure
|
||||
- `high`: Configuration changes
|
||||
- `urgent`: Connection recovery events
|
||||
3. **Adaptive Cleanup**: Maintenance thread dynamically adjusts processing frequency
|
||||
4. **Wait Queue**: Fair connection allocation to waiting clients
|
||||
5. **Atomic Operations**: Non-blocking statistics tracking
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
- Use appropriate `min_idle_conns` to balance startup time and memory
|
||||
- Set `max_lifetime` according to your backend connection limits
|
||||
- Monitor `creation_errors` statistic to detect connection issues
|
||||
- Use `evicted_count` to identify connection health problems
|
||||
|
||||
## Example Implementation
|
||||
|
||||
```v
|
||||
// ConnectionPoolable connection interface implementation
|
||||
struct MyConnection {
|
||||
// Your connection state
|
||||
}
|
||||
|
||||
fn (mut c MyConnection) validate() !bool {
|
||||
// Connection health check logic
|
||||
return true
|
||||
}
|
||||
|
||||
fn (mut c MyConnection) close() ! {
|
||||
// Physical close logic
|
||||
}
|
||||
|
||||
fn (mut c MyConnection) reset() ! {
|
||||
// Reset connection to initial state
|
||||
}
|
||||
```
|
742
vlib/pool/connection.v
Normal file
742
vlib/pool/connection.v
Normal file
|
@ -0,0 +1,742 @@
|
|||
module pool
|
||||
|
||||
import sync
|
||||
import sync.stdatomic { new_atomic }
|
||||
import time
|
||||
|
||||
// Eviction channel capacity
|
||||
const eviction_ch_cap = 1000
|
||||
|
||||
// ConnectionPoolable defines the interface for connection objects
|
||||
pub interface ConnectionPoolable {
|
||||
mut:
|
||||
// validate checks if the connection is still usable
|
||||
validate() !bool
|
||||
// close terminates the physical connection
|
||||
close() !
|
||||
// reset returns the connection to initial state for reuse
|
||||
reset() !
|
||||
}
|
||||
|
||||
// ConnectionPoolConfig holds configuration settings for the connection pool
|
||||
@[params]
|
||||
pub struct ConnectionPoolConfig {
|
||||
pub mut:
|
||||
max_conns int = 20 // Maximum allowed connections
|
||||
min_idle_conns int = 5 // Minimum idle connections to maintain
|
||||
max_lifetime time.Duration = time.hour // Max lifetime of a connection
|
||||
idle_timeout time.Duration = 30 * time.minute // Time before idle connections are cleaned up
|
||||
get_timeout time.Duration = 5 * time.second // Max time to wait for a connection
|
||||
retry_base_delay time.Duration = 1 * time.second // Base delay for retry backoff
|
||||
max_retry_delay time.Duration = 30 * time.second // Maximum delay for retry backoff
|
||||
max_retry_attempts int = 5 // Maximum retry attempts
|
||||
}
|
||||
|
||||
// ConnectionWrapper contains metadata about a pooled connection
|
||||
struct ConnectionWrapper {
|
||||
mut:
|
||||
conn &ConnectionPoolable // The actual connection object
|
||||
created_at time.Time // When connection was created
|
||||
last_used_at time.Time // Last time connection was used
|
||||
last_valid_at time.Time // Last time connection was validated
|
||||
usage_count int // How many times this connection has been used
|
||||
}
|
||||
|
||||
// EvictionPriority indicates urgency of connection cleanup
|
||||
pub enum EvictionPriority {
|
||||
low // Routine cleanup (connection return)
|
||||
medium // Connection get failure
|
||||
high // Configuration change
|
||||
urgent // Database/Server recovery
|
||||
}
|
||||
|
||||
// ConnectionPool manages a pool of reusable connections
|
||||
pub struct ConnectionPool {
|
||||
mut:
|
||||
config ConnectionPoolConfig
|
||||
// Lock order:
|
||||
// config_mutex > create_mutex > idle_pool_mutex > all_conns_mutex > wait_queue_mutex
|
||||
config_mutex &sync.RwMutex @[required] // Guards configuration changes
|
||||
create_mutex &sync.Mutex // Serializes connection creation
|
||||
idle_pool_mutex &sync.RwMutex @[required] // Protects idle connections
|
||||
all_conns_mutex &sync.RwMutex @[required] // Protects all connections map
|
||||
wait_queue_mutex &sync.RwMutex @[required] // Protects wait queue
|
||||
is_closed &stdatomic.AtomicVal[bool] @[required] // Pool shutdown flag
|
||||
stop_ch chan bool // Signals maintenance thread to stop
|
||||
eviction_ch chan EvictionPriority // Eviction event channel
|
||||
cleanup_thread thread // Background maintenance thread
|
||||
wait_queue []chan bool // Client wait queue for connection acquisition
|
||||
conn_factory fn () !&ConnectionPoolable @[required] // Creates new connections
|
||||
active_count &stdatomic.AtomicVal[int] @[required] // Currently checked-out connections
|
||||
created_at time.Time // Pool creation timestamp
|
||||
all_conns map[voidptr]&ConnectionWrapper // All tracked connections
|
||||
idle_pool []&ConnectionWrapper // Currently idle connections
|
||||
creation_errors &stdatomic.AtomicVal[int] @[required] // Failed creation attempts
|
||||
evicted_count &stdatomic.AtomicVal[int] @[required] // Connections forcibly removed
|
||||
creating_count &stdatomic.AtomicVal[int] @[required] // Connections being created
|
||||
}
|
||||
|
||||
// new_connection_pool creates a new connection pool
|
||||
pub fn new_connection_pool(conn_factory fn () !&ConnectionPoolable, config ConnectionPoolConfig) !&ConnectionPool {
|
||||
// Validate configuration parameters
|
||||
check_config(config)!
|
||||
|
||||
mut p := &ConnectionPool{
|
||||
conn_factory: conn_factory
|
||||
config: config
|
||||
config_mutex: sync.new_rwmutex()
|
||||
create_mutex: sync.new_mutex()
|
||||
idle_pool_mutex: sync.new_rwmutex()
|
||||
all_conns_mutex: sync.new_rwmutex()
|
||||
wait_queue_mutex: sync.new_rwmutex()
|
||||
is_closed: new_atomic(false)
|
||||
stop_ch: chan bool{cap: 1}
|
||||
eviction_ch: chan EvictionPriority{cap: eviction_ch_cap}
|
||||
active_count: new_atomic(0)
|
||||
creation_errors: new_atomic(0)
|
||||
evicted_count: new_atomic(0)
|
||||
creating_count: new_atomic(0)
|
||||
all_conns: map[voidptr]&ConnectionWrapper{}
|
||||
}
|
||||
|
||||
now := time.utc()
|
||||
p.created_at = now
|
||||
|
||||
// Initialize minimum idle connections
|
||||
for _ in 0 .. config.min_idle_conns {
|
||||
conn := p.create_conn_with_retry() or {
|
||||
// Cleanup if initialization fails
|
||||
p.all_conns_mutex.lock()
|
||||
for _, mut wrapper in p.all_conns {
|
||||
wrapper.conn.close() or {}
|
||||
}
|
||||
p.all_conns.clear()
|
||||
p.all_conns_mutex.unlock()
|
||||
return err
|
||||
}
|
||||
wrapper := &ConnectionWrapper{
|
||||
conn: conn
|
||||
created_at: now
|
||||
last_used_at: now
|
||||
last_valid_at: now
|
||||
}
|
||||
p.idle_pool << wrapper
|
||||
p.all_conns_mutex.lock()
|
||||
p.all_conns[conn] = wrapper
|
||||
p.all_conns_mutex.unlock()
|
||||
}
|
||||
|
||||
// Start background maintenance thread
|
||||
p.cleanup_thread = spawn p.background_maintenance()
|
||||
// Initial connection pruning
|
||||
p.prune_connections()
|
||||
return p
|
||||
}
|
||||
|
||||
// create_conn_with_retry creates a connection with exponential backoff retries
|
||||
fn (mut p ConnectionPool) create_conn_with_retry() !&ConnectionPoolable {
|
||||
// Get current configuration
|
||||
p.config_mutex.rlock()
|
||||
max_attempts := p.config.max_retry_attempts
|
||||
base_delay := p.config.retry_base_delay
|
||||
max_delay := p.config.max_retry_delay
|
||||
p.config_mutex.unlock()
|
||||
|
||||
// Serialize connection creation
|
||||
p.create_mutex.lock()
|
||||
defer {
|
||||
p.create_mutex.unlock()
|
||||
}
|
||||
|
||||
mut attempt := 0
|
||||
p.creating_count.add(1)
|
||||
defer {
|
||||
p.creating_count.sub(1)
|
||||
}
|
||||
|
||||
for {
|
||||
mut conn := p.conn_factory() or {
|
||||
// Handle creation error with exponential backoff
|
||||
if attempt >= max_attempts {
|
||||
return error('Connection creation failed after ${attempt} attempts: ${err}')
|
||||
}
|
||||
|
||||
// Calculate next delay with exponential backoff
|
||||
mut delay := base_delay * time.Duration(1 << attempt)
|
||||
if delay > max_delay {
|
||||
delay = max_delay
|
||||
}
|
||||
|
||||
time.sleep(delay)
|
||||
attempt++
|
||||
p.creation_errors.add(1)
|
||||
continue
|
||||
}
|
||||
|
||||
// Validate new connection
|
||||
if !conn.validate() or { false } {
|
||||
conn.close() or {}
|
||||
return error('New connection validation failed')
|
||||
}
|
||||
return conn
|
||||
}
|
||||
return error('Unreachable code')
|
||||
}
|
||||
|
||||
// try_wakeup_waiters attempts to notify waiting clients of available resources
|
||||
fn (mut p ConnectionPool) try_wakeup_waiters() {
|
||||
can_create := p.can_create()
|
||||
p.wait_queue_mutex.lock()
|
||||
defer {
|
||||
p.wait_queue_mutex.unlock()
|
||||
}
|
||||
|
||||
// Notify first client if resources are available
|
||||
if p.wait_queue.len > 0 {
|
||||
if p.idle_pool.len > 0 || can_create {
|
||||
to_wake := p.wait_queue[0]
|
||||
p.wait_queue.delete(0)
|
||||
to_wake <- true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// can_create checks if new connections can be created
|
||||
@[inline]
|
||||
fn (mut p ConnectionPool) can_create() bool {
|
||||
p.config_mutex.rlock()
|
||||
max_conns := p.config.max_conns
|
||||
p.config_mutex.unlock()
|
||||
return p.active_count.load() + p.creating_count.load() < max_conns && !p.is_closed.load()
|
||||
&& p.all_conns.len < max_conns
|
||||
}
|
||||
|
||||
// get acquires a connection from the pool with timeout
|
||||
pub fn (mut p ConnectionPool) get() !&ConnectionPoolable {
|
||||
start_time := time.utc()
|
||||
for {
|
||||
// Check if pool is closed
|
||||
if p.is_closed.load() {
|
||||
return error('Connection pool closed')
|
||||
}
|
||||
|
||||
// Try immediate acquisition
|
||||
if conn := p.try_get() {
|
||||
p.eviction_ch <- .medium
|
||||
return conn
|
||||
}
|
||||
|
||||
// Check if new connection can be created
|
||||
can_create := p.can_create()
|
||||
if can_create {
|
||||
mut new_conn := p.create_conn_with_retry()!
|
||||
|
||||
// Final check before adding to pool
|
||||
if p.is_closed.load() {
|
||||
new_conn.close()!
|
||||
return error('Connection pool closed')
|
||||
}
|
||||
|
||||
p.config_mutex.rlock()
|
||||
max_conns := p.config.max_conns
|
||||
p.config_mutex.unlock()
|
||||
|
||||
p.all_conns_mutex.lock()
|
||||
if p.all_conns.len < max_conns {
|
||||
// Successfully create and add new connection
|
||||
now := time.utc()
|
||||
wrapper := &ConnectionWrapper{
|
||||
conn: new_conn
|
||||
created_at: now
|
||||
last_used_at: now
|
||||
last_valid_at: now
|
||||
}
|
||||
p.all_conns[new_conn] = wrapper
|
||||
p.all_conns_mutex.unlock()
|
||||
p.active_count.add(1)
|
||||
return new_conn
|
||||
} else {
|
||||
// Connection limit reached - close new connection
|
||||
p.all_conns_mutex.unlock()
|
||||
new_conn.close()!
|
||||
}
|
||||
}
|
||||
|
||||
// Second attempt to get connection
|
||||
if conn := p.try_get() {
|
||||
return conn
|
||||
}
|
||||
|
||||
// Calculate remaining time for connection acquisition
|
||||
p.config_mutex.rlock()
|
||||
timeout := p.config.get_timeout
|
||||
p.config_mutex.unlock()
|
||||
elapsed := time.utc() - start_time
|
||||
if elapsed > timeout {
|
||||
return error('Connection acquisition timeout')
|
||||
}
|
||||
remaining := timeout - elapsed
|
||||
|
||||
// Set up notification channel
|
||||
notify_chan := chan bool{cap: 1}
|
||||
defer {
|
||||
notify_chan.close()
|
||||
}
|
||||
|
||||
// Add to wait queue
|
||||
p.wait_queue_mutex.lock()
|
||||
p.wait_queue << notify_chan
|
||||
p.wait_queue_mutex.unlock()
|
||||
|
||||
select {
|
||||
_ := <-notify_chan {
|
||||
// Notification received - retry acquisition
|
||||
}
|
||||
i64(remaining) {
|
||||
// Timeout cleanup
|
||||
p.wait_queue_mutex.lock()
|
||||
for i := 0; i < p.wait_queue.len; i++ {
|
||||
if p.wait_queue[i] == notify_chan {
|
||||
p.wait_queue.delete(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
p.wait_queue_mutex.unlock()
|
||||
if conn := p.try_get() {
|
||||
return conn
|
||||
}
|
||||
return error('Connection acquisition timeout')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return error('Unreachable code')
|
||||
}
|
||||
|
||||
// try_get attempts non-blocking connection acquisition
|
||||
fn (mut p ConnectionPool) try_get() ?&ConnectionPoolable {
|
||||
// Get relevant configuration parameters
|
||||
p.config_mutex.rlock()
|
||||
min_idle := p.config.min_idle_conns
|
||||
max_lifetime := p.config.max_lifetime
|
||||
p.config_mutex.unlock()
|
||||
|
||||
p.idle_pool_mutex.lock()
|
||||
defer {
|
||||
p.idle_pool_mutex.unlock()
|
||||
}
|
||||
|
||||
// Determine eviction priority based on idle count
|
||||
priority := if p.idle_pool.len <= min_idle {
|
||||
EvictionPriority.urgent
|
||||
} else if p.idle_pool.len > min_idle * 2 {
|
||||
EvictionPriority.low
|
||||
} else {
|
||||
EvictionPriority.medium
|
||||
}
|
||||
p.eviction_ch <- priority
|
||||
|
||||
// Process idle connections
|
||||
for p.idle_pool.len > 0 {
|
||||
mut wrapper := p.idle_pool.pop()
|
||||
|
||||
// Check connection lifetime
|
||||
age := time.utc() - wrapper.created_at
|
||||
if age > max_lifetime {
|
||||
// Close expired connection
|
||||
p.all_conns_mutex.lock()
|
||||
p.all_conns.delete(wrapper.conn)
|
||||
p.all_conns_mutex.unlock()
|
||||
wrapper.conn.close() or {}
|
||||
continue
|
||||
}
|
||||
|
||||
// Validate connection
|
||||
if !wrapper.conn.validate() or { false } {
|
||||
// Handle invalid connection
|
||||
p.all_conns_mutex.lock()
|
||||
p.all_conns.delete(wrapper.conn)
|
||||
p.all_conns_mutex.unlock()
|
||||
wrapper.conn.close() or {}
|
||||
continue
|
||||
}
|
||||
|
||||
wrapper.last_valid_at = time.utc()
|
||||
|
||||
// Mark connection as active
|
||||
p.active_count.add(1)
|
||||
wrapper.last_used_at = time.utc()
|
||||
wrapper.usage_count++
|
||||
return wrapper.conn
|
||||
}
|
||||
return none
|
||||
}
|
||||
|
||||
// put returns a connection to the pool
|
||||
pub fn (mut p ConnectionPool) put(conn &ConnectionPoolable) ! {
|
||||
if p.active_count.load() > 0 {
|
||||
// TODO: may need a atomic check here, compare_exchange?
|
||||
p.active_count.sub(1)
|
||||
}
|
||||
|
||||
mut conn_ptr := unsafe { conn }
|
||||
// Handle closed pool case
|
||||
if p.is_closed.load() {
|
||||
conn_ptr.close()!
|
||||
return
|
||||
}
|
||||
|
||||
// Reset connection to initial state
|
||||
conn_ptr.reset() or {
|
||||
conn_ptr.close() or {}
|
||||
p.all_conns_mutex.lock()
|
||||
p.all_conns.delete(conn)
|
||||
p.all_conns_mutex.unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
p.idle_pool_mutex.lock()
|
||||
p.all_conns_mutex.lock()
|
||||
defer {
|
||||
p.all_conns_mutex.unlock()
|
||||
p.idle_pool_mutex.unlock()
|
||||
}
|
||||
|
||||
// Return connection to idle pool
|
||||
if mut wrapper := p.all_conns[conn] {
|
||||
wrapper.last_used_at = time.utc()
|
||||
p.idle_pool << wrapper
|
||||
|
||||
// Determine if eviction is needed
|
||||
p.config_mutex.rlock()
|
||||
low_eviction := p.idle_pool.len > p.config.min_idle_conns
|
||||
p.config_mutex.unlock()
|
||||
|
||||
// Wake any waiting clients
|
||||
p.try_wakeup_waiters()
|
||||
|
||||
// Trigger eviction if needed
|
||||
priority := if low_eviction { EvictionPriority.low } else { EvictionPriority.urgent }
|
||||
p.eviction_ch <- priority
|
||||
} else {
|
||||
// Handle unmanaged connection
|
||||
conn_ptr.close()!
|
||||
return error('Unmanaged connection')
|
||||
}
|
||||
}
|
||||
|
||||
// close shuts down the connection pool and cleans up resources
|
||||
pub fn (mut p ConnectionPool) close() {
|
||||
if p.is_closed.load() {
|
||||
return
|
||||
}
|
||||
p.is_closed.store(true)
|
||||
|
||||
// Signal background thread to stop
|
||||
p.stop_ch <- true
|
||||
p.cleanup_thread.wait()
|
||||
p.stop_ch.close()
|
||||
|
||||
// Close all active connections
|
||||
p.idle_pool_mutex.lock()
|
||||
p.all_conns_mutex.lock()
|
||||
for _, mut wrapper in p.all_conns {
|
||||
wrapper.conn.close() or {}
|
||||
}
|
||||
p.all_conns.clear()
|
||||
p.idle_pool.clear()
|
||||
p.all_conns_mutex.unlock()
|
||||
p.idle_pool_mutex.unlock()
|
||||
|
||||
// Process clients in the wait queue
|
||||
p.wait_queue_mutex.lock()
|
||||
waiters := p.wait_queue.clone()
|
||||
p.wait_queue.clear()
|
||||
p.wait_queue_mutex.unlock()
|
||||
|
||||
for ch in waiters {
|
||||
ch <- true // Notify all waiting clients
|
||||
}
|
||||
|
||||
p.eviction_ch.close()
|
||||
|
||||
// Reset all counters
|
||||
p.active_count.store(0)
|
||||
p.creation_errors.store(0)
|
||||
p.evicted_count.store(0)
|
||||
p.creating_count.store(0)
|
||||
}
|
||||
|
||||
// background_maintenance handles periodic connection cleanup
|
||||
fn (mut p ConnectionPool) background_maintenance() {
|
||||
mut first_trigger_time := u64(0)
|
||||
mut event_count := 0
|
||||
mut min_interval := time.infinite // Dynamic processing interval
|
||||
|
||||
for {
|
||||
// Calculate adaptive processing interval
|
||||
p.config_mutex.rlock()
|
||||
dynamic_interval := if p.config.idle_timeout / 10 > time.second {
|
||||
time.second
|
||||
} else {
|
||||
p.config.idle_timeout / 10
|
||||
}
|
||||
p.config_mutex.unlock()
|
||||
|
||||
interval := if min_interval < dynamic_interval {
|
||||
min_interval
|
||||
} else {
|
||||
dynamic_interval
|
||||
}
|
||||
|
||||
select {
|
||||
_ := <-p.stop_ch {
|
||||
// Termination signal received
|
||||
return
|
||||
}
|
||||
priority := <-p.eviction_ch {
|
||||
// Process event based on priority
|
||||
match priority {
|
||||
.low {
|
||||
event_count++
|
||||
min_interval = 100 * time.millisecond
|
||||
}
|
||||
.medium {
|
||||
event_count += 10
|
||||
min_interval = 10 * time.millisecond
|
||||
}
|
||||
.high {
|
||||
event_count += 50
|
||||
min_interval = 1 * time.millisecond
|
||||
}
|
||||
.urgent {
|
||||
event_count += 1000
|
||||
min_interval = 100 * time.microsecond
|
||||
}
|
||||
}
|
||||
|
||||
// Track first event time
|
||||
if first_trigger_time == 0 {
|
||||
first_trigger_time = time.sys_mono_now()
|
||||
}
|
||||
|
||||
elapsed := time.sys_mono_now() - first_trigger_time
|
||||
|
||||
// Determine if immediate processing is needed
|
||||
if priority == .urgent
|
||||
|| (priority == .high && elapsed > 100 * time.microsecond)
|
||||
|| (priority == .medium && elapsed > 1 * time.millisecond)
|
||||
|| (priority == .low && elapsed > 10 * time.millisecond)
|
||||
|| event_count >= 1000 {
|
||||
p.prune_connections()
|
||||
event_count = 0
|
||||
first_trigger_time = 0
|
||||
min_interval = time.infinite
|
||||
}
|
||||
}
|
||||
i64(interval) {
|
||||
// Periodic maintenance
|
||||
if event_count > 0 || interval == min_interval {
|
||||
p.prune_connections()
|
||||
event_count = 0
|
||||
first_trigger_time = 0
|
||||
min_interval = time.infinite
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prune_connections removes invalid connections and maintains min idle count
|
||||
fn (mut p ConnectionPool) prune_connections() {
|
||||
// Get current configuration parameters
|
||||
p.config_mutex.rlock()
|
||||
max_lifetime := p.config.max_lifetime
|
||||
idle_timeout := p.config.idle_timeout
|
||||
min_idle := p.config.min_idle_conns
|
||||
p.config_mutex.unlock()
|
||||
|
||||
p.idle_pool_mutex.lock()
|
||||
// Remove stale connections
|
||||
for i := p.idle_pool.len - 1; i >= 0; i-- {
|
||||
mut wrapper := p.idle_pool[i]
|
||||
age := time.utc() - wrapper.created_at
|
||||
idle_time := time.utc() - wrapper.last_used_at
|
||||
|
||||
if age > max_lifetime || idle_time > idle_timeout || !wrapper.conn.validate() or { false } {
|
||||
p.all_conns_mutex.lock()
|
||||
p.all_conns.delete(wrapper.conn)
|
||||
p.all_conns_mutex.unlock()
|
||||
wrapper.conn.close() or {}
|
||||
p.idle_pool.delete(i)
|
||||
p.evicted_count.add(1)
|
||||
} else {
|
||||
wrapper.last_valid_at = time.utc()
|
||||
}
|
||||
}
|
||||
current_idle := p.idle_pool.len
|
||||
p.idle_pool_mutex.unlock()
|
||||
|
||||
// Calculate connections to create
|
||||
to_create := if min_idle > current_idle { min_idle - current_idle } else { 0 }
|
||||
|
||||
// Create needed connections
|
||||
mut new_conns := []&ConnectionPoolable{}
|
||||
if to_create > 0 {
|
||||
for _ in 0 .. to_create {
|
||||
if new_conn := p.create_conn_with_retry() {
|
||||
new_conns << new_conn
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if pool was closed during creation
|
||||
if p.is_closed.load() {
|
||||
for mut new_conn in new_conns {
|
||||
new_conn.close() or {}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
p.config_mutex.rlock()
|
||||
current_min_idle := p.config.min_idle_conns
|
||||
max_conns := p.config.max_conns
|
||||
p.config_mutex.unlock()
|
||||
|
||||
// Add new connections to the pool
|
||||
p.idle_pool_mutex.lock()
|
||||
p.all_conns_mutex.lock()
|
||||
defer {
|
||||
p.all_conns_mutex.unlock()
|
||||
p.idle_pool_mutex.unlock()
|
||||
}
|
||||
|
||||
actual_needed := if current_min_idle > p.idle_pool.len {
|
||||
current_min_idle - p.idle_pool.len
|
||||
} else {
|
||||
0
|
||||
}
|
||||
available_slots := max_conns - p.all_conns.len
|
||||
mut actual_to_add := if actual_needed > new_conns.len { new_conns.len } else { actual_needed }
|
||||
actual_to_add = if actual_to_add > available_slots { available_slots } else { actual_to_add }
|
||||
|
||||
// Create wrapper for each new connection
|
||||
for i in 0 .. actual_to_add {
|
||||
now := time.utc()
|
||||
wrapper := &ConnectionWrapper{
|
||||
conn: new_conns[i]
|
||||
created_at: now
|
||||
last_used_at: now
|
||||
last_valid_at: now
|
||||
}
|
||||
p.idle_pool << wrapper
|
||||
p.all_conns[new_conns[i]] = wrapper
|
||||
}
|
||||
|
||||
// Close any extra connections
|
||||
for i in actual_to_add .. new_conns.len {
|
||||
new_conns[i].close() or {}
|
||||
}
|
||||
|
||||
// Wake clients if connections were added
|
||||
if actual_to_add > 0 {
|
||||
p.try_wakeup_waiters()
|
||||
}
|
||||
}
|
||||
|
||||
fn check_config(config ConnectionPoolConfig) ! {
|
||||
if config.max_conns <= 0 {
|
||||
return error('max_conns must be positive')
|
||||
}
|
||||
if config.min_idle_conns < 0 {
|
||||
return error('min_idle_conns cannot be negative')
|
||||
}
|
||||
if config.min_idle_conns > config.max_conns {
|
||||
return error('min_idle_conns cannot exceed max_conns')
|
||||
}
|
||||
if config.max_lifetime < 0 {
|
||||
return error('max_lifetime cannot be negative')
|
||||
}
|
||||
if config.idle_timeout < 0 {
|
||||
return error('idle_timeout cannot be negative')
|
||||
}
|
||||
if config.idle_timeout > config.max_lifetime {
|
||||
return error('idle_timeout cannot exceed max_lifetime')
|
||||
}
|
||||
if config.get_timeout < 0 {
|
||||
return error('get_timeout cannot be negative')
|
||||
}
|
||||
if config.retry_base_delay < 0 {
|
||||
return error('retry_base_delay cannot be negative')
|
||||
}
|
||||
if config.max_retry_delay < 0 {
|
||||
return error('max_retry_delay cannot be negative')
|
||||
}
|
||||
if config.max_retry_attempts < 0 {
|
||||
return error('max_retry_attempts cannot be negative')
|
||||
}
|
||||
}
|
||||
|
||||
// update_config changes the connection pool configuration
|
||||
pub fn (mut p ConnectionPool) update_config(config ConnectionPoolConfig) ! {
|
||||
// Validate configuration
|
||||
check_config(config)!
|
||||
// Check pool status
|
||||
if p.is_closed.load() {
|
||||
return error('Connection pool is closed')
|
||||
}
|
||||
|
||||
// Update configuration
|
||||
p.config_mutex.lock()
|
||||
p.config = config
|
||||
p.config_mutex.unlock()
|
||||
|
||||
// Trigger maintenance
|
||||
p.eviction_ch <- .high
|
||||
}
|
||||
|
||||
// signal_recovery_event notifies the pool of recovery event
|
||||
pub fn (mut p ConnectionPool) signal_recovery_event() {
|
||||
p.eviction_ch <- .urgent
|
||||
}
|
||||
|
||||
// send_eviction triggers a cleanup event
|
||||
pub fn (mut p ConnectionPool) send_eviction(priority EvictionPriority) {
|
||||
p.eviction_ch <- priority
|
||||
}
|
||||
|
||||
// ConnectionPoolStats holds statistics about the pool
|
||||
pub struct ConnectionPoolStats {
|
||||
pub:
|
||||
total_conns int // All managed connections
|
||||
active_conns int // Currently checked-out connections
|
||||
idle_conns int // Available connections
|
||||
waiting_clients int // Clients waiting for a connection
|
||||
evicted_count int // Connections forcibly removed
|
||||
creation_errors int // Failed creation attempts
|
||||
created_at time.Time // When pool was created
|
||||
creating_count int // Connections being created
|
||||
}
|
||||
|
||||
// stats retrieves current connection pool statistics
|
||||
pub fn (mut p ConnectionPool) stats() ConnectionPoolStats {
|
||||
p.idle_pool_mutex.rlock()
|
||||
p.all_conns_mutex.rlock()
|
||||
p.wait_queue_mutex.rlock()
|
||||
defer {
|
||||
p.wait_queue_mutex.unlock()
|
||||
p.all_conns_mutex.unlock()
|
||||
p.idle_pool_mutex.unlock()
|
||||
}
|
||||
|
||||
return ConnectionPoolStats{
|
||||
total_conns: p.all_conns.len
|
||||
active_conns: p.active_count.load()
|
||||
idle_conns: p.idle_pool.len
|
||||
waiting_clients: p.wait_queue.len
|
||||
evicted_count: p.evicted_count.load()
|
||||
creation_errors: p.creation_errors.load()
|
||||
created_at: p.created_at
|
||||
creating_count: p.creating_count.load()
|
||||
}
|
||||
}
|
292
vlib/pool/connection_test.v
Normal file
292
vlib/pool/connection_test.v
Normal file
|
@ -0,0 +1,292 @@
|
|||
// vtest build: !msvc // msvc hung, maybe sync/atomic bug
|
||||
import time
|
||||
import sync
|
||||
import pool
|
||||
import rand
|
||||
|
||||
// Mock connection implementation
|
||||
struct MockConn {
|
||||
mut:
|
||||
healthy bool
|
||||
close_flag bool
|
||||
reset_flag bool
|
||||
closed int
|
||||
id string
|
||||
}
|
||||
|
||||
fn (mut c MockConn) validate() !bool {
|
||||
return c.healthy
|
||||
}
|
||||
|
||||
fn (mut c MockConn) close() ! {
|
||||
if c.close_flag {
|
||||
return error('simulated close error')
|
||||
}
|
||||
c.closed++
|
||||
}
|
||||
|
||||
fn (mut c MockConn) reset() ! {
|
||||
if c.reset_flag {
|
||||
return error('simulated reset error')
|
||||
}
|
||||
c.reset_flag = true
|
||||
}
|
||||
|
||||
// Test utility functions
|
||||
fn create_mock_factory(mut arr []&pool.ConnectionPoolable, healthy bool, fail_times int) fn () !&pool.ConnectionPoolable {
|
||||
mut count := 0
|
||||
return fn [mut arr, healthy, fail_times, mut count] () !&pool.ConnectionPoolable {
|
||||
if count < fail_times {
|
||||
count++
|
||||
return error('connection creation failed')
|
||||
}
|
||||
mut conn := &MockConn{
|
||||
healthy: healthy
|
||||
id: rand.uuid_v7()
|
||||
}
|
||||
arr << conn
|
||||
return conn
|
||||
}
|
||||
}
|
||||
|
||||
fn is_same_conn(conn1 &pool.ConnectionPoolable, conn2 &pool.ConnectionPoolable) bool {
|
||||
c1 := conn1 as MockConn
|
||||
c2 := conn2 as MockConn
|
||||
return c1.id == c2.id
|
||||
}
|
||||
|
||||
// Test cases
|
||||
fn test_basic_usage() {
|
||||
for _ in 0 .. 1 {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_conns: 5
|
||||
min_idle_conns: 2
|
||||
idle_timeout: 100 * time.millisecond
|
||||
get_timeout: 50 * time.millisecond
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config)!
|
||||
|
||||
// Acquire a connection
|
||||
mut conn1 := p.get()!
|
||||
assert p.stats().active_conns == 1
|
||||
|
||||
// Acquire multiple connections
|
||||
mut conns := [p.get()!, p.get()!, p.get()!]
|
||||
assert p.stats().active_conns == 4
|
||||
|
||||
// Return connections
|
||||
for c in conns {
|
||||
p.put(c)!
|
||||
}
|
||||
p.put(conn1)!
|
||||
assert p.stats().active_conns == 0
|
||||
assert p.stats().total_conns >= 4
|
||||
p.close()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_pool_exhaustion() {
|
||||
for i in 0 .. 1 {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_conns: 2
|
||||
min_idle_conns: 1
|
||||
get_timeout: 10 * time.millisecond
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config)!
|
||||
|
||||
// Acquire all connections
|
||||
c1 := p.get()!
|
||||
c2 := p.get()!
|
||||
assert p.stats().active_conns == 2
|
||||
|
||||
// Attempt to acquire third connection (should timeout)
|
||||
p.get() or { assert err.msg().contains('timeout') }
|
||||
|
||||
// After returning, should be able to acquire again
|
||||
p.put(c2)!
|
||||
c3 := p.get()!
|
||||
assert is_same_conn(c3, c2)
|
||||
assert p.stats().active_conns == 2
|
||||
p.close()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_connection_validation() {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, false, 0) // Create unhealthy connections
|
||||
config := pool.ConnectionPoolConfig{
|
||||
min_idle_conns: 1
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config) or {
|
||||
assert err.msg().contains('connection validation failed')
|
||||
return
|
||||
}
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
}
|
||||
|
||||
fn test_eviction() {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_lifetime: 10 * time.millisecond
|
||||
idle_timeout: 10 * time.millisecond
|
||||
min_idle_conns: 0
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config)!
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
|
||||
// Acquire and return a connection
|
||||
c1 := p.get()!
|
||||
p.put(c1)!
|
||||
|
||||
// Wait longer than timeout thresholds
|
||||
time.sleep(100 * time.millisecond)
|
||||
p.send_eviction(.urgent)
|
||||
time.sleep(10 * time.millisecond)
|
||||
stats := p.stats()
|
||||
assert stats.evicted_count > 0
|
||||
assert stats.total_conns == 0
|
||||
assert stats.idle_conns == 0
|
||||
c2 := p.get()!
|
||||
assert !is_same_conn(c1, c2)
|
||||
assert p.stats().total_conns == 1
|
||||
assert p.stats().evicted_count > 0
|
||||
}
|
||||
|
||||
fn test_retry_mechanism() {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 3) // First 3 attempts fail
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_retry_attempts: 5
|
||||
retry_base_delay: 10 * time.millisecond
|
||||
min_idle_conns: 0 // No idle connections
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config)!
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
// Should successfully create connection after retries
|
||||
conn := p.get()!
|
||||
assert test_conns.len == 1
|
||||
assert p.stats().creation_errors == 3
|
||||
}
|
||||
|
||||
fn test_concurrent_access() {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
config := pool.ConnectionPoolConfig{
|
||||
max_conns: 10
|
||||
}
|
||||
|
||||
mut p := pool.new_connection_pool(factory, config)!
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
mut wg := &sync.WaitGroup{}
|
||||
|
||||
for _ in 0 .. 20 {
|
||||
wg.add(1)
|
||||
spawn fn (mut p pool.ConnectionPool, mut wg sync.WaitGroup) ! {
|
||||
defer { wg.done() }
|
||||
conn := p.get()!
|
||||
time.sleep(5 * time.millisecond)
|
||||
p.put(conn)!
|
||||
}(mut p, mut wg)
|
||||
}
|
||||
|
||||
wg.wait()
|
||||
stats := p.stats()
|
||||
assert stats.total_conns <= 10
|
||||
assert stats.active_conns == 0
|
||||
}
|
||||
|
||||
fn test_pool_close() {
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
|
||||
mut p := pool.new_connection_pool(factory, pool.ConnectionPoolConfig{})!
|
||||
c := p.get()!
|
||||
p.put(c)!
|
||||
assert p.stats().active_conns == 0
|
||||
assert p.stats().idle_conns == 5
|
||||
|
||||
p.close()
|
||||
|
||||
// Attempt to acquire connection after close
|
||||
p.get() or { assert err.msg().contains('closed') }
|
||||
assert p.stats().active_conns == 0
|
||||
assert p.stats().idle_conns == 0
|
||||
|
||||
// Verify connection was closed
|
||||
mock_conn := test_conns[0] as MockConn
|
||||
assert mock_conn.closed == 1
|
||||
}
|
||||
|
||||
fn test_config_update() {
|
||||
mut dummy := []&pool.ConnectionPoolable{}
|
||||
mut p := pool.new_connection_pool(create_mock_factory(mut dummy, true, 0), pool.ConnectionPoolConfig{
|
||||
max_conns: 2
|
||||
min_idle_conns: 1
|
||||
})!
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
assert p.stats().idle_conns == 1
|
||||
|
||||
// Modify configuration
|
||||
new_config := pool.ConnectionPoolConfig{
|
||||
max_conns: 5
|
||||
min_idle_conns: 3
|
||||
}
|
||||
|
||||
p.update_config(new_config)!
|
||||
|
||||
// Trigger idle connection replenishment
|
||||
time.sleep(100 * time.millisecond)
|
||||
assert p.stats().idle_conns >= 3
|
||||
}
|
||||
|
||||
fn test_error_handling() {
|
||||
// Test close error handling
|
||||
mut test_conns := []&pool.ConnectionPoolable{}
|
||||
factory := create_mock_factory(mut test_conns, true, 0)
|
||||
mut p := pool.new_connection_pool(factory, pool.ConnectionPoolConfig{})!
|
||||
defer {
|
||||
p.close()
|
||||
}
|
||||
|
||||
// default configuration has 5 idle_conns
|
||||
assert p.stats().idle_conns == 5
|
||||
|
||||
// Bug Fix Needed! msvc generated wrong code for `mut conn := p.get()! as MockConn`
|
||||
mut connx := p.get()!
|
||||
mut conn := connx as MockConn
|
||||
assert p.stats().active_conns == 1
|
||||
// it depend on `background_maintenance` thread to keep 5 idle_conns
|
||||
assert p.stats().idle_conns >= 4
|
||||
conn.close_flag = true
|
||||
p.put(conn) or { assert err.msg().contains('simulated close error') }
|
||||
assert p.stats().active_conns == 0
|
||||
// it depend on `background_maintenance` thread to keep 5 idle_conns
|
||||
assert p.stats().idle_conns >= 4
|
||||
|
||||
// Test reset error handling
|
||||
conn.reset_flag = true
|
||||
p.put(conn) or { assert err.msg().contains('reset') }
|
||||
assert p.stats().active_conns == 0
|
||||
// it depend on `background_maintenance` thread to keep 5 idle_conns
|
||||
assert p.stats().idle_conns >= 4
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue