diff --git a/vlib/pool/README.md b/vlib/pool/README.md new file mode 100644 index 0000000000..1e75784550 --- /dev/null +++ b/vlib/pool/README.md @@ -0,0 +1,148 @@ +# Connection Pool Module + +This module provides a robust connection pooling implementation for +managing reusable resources like database connections. It handles +connection lifecycle, validation, and efficient resource allocation +with minimal overhead. + +## Features + +- **Connection Reuse**: Efficiently manages reusable connections +- **Health Validation**: Automatic connection validation +- **Dynamic Scaling**: Adjusts pool size based on demand +- **Intelligent Eviction**: Removes stale connections with priority-based cleanup +- **Statistics Tracking**: Provides detailed pool metrics +- **Thread Safety**: Fully concurrent-safe implementation +- **Graceful Shutdown**: Clean resource termination +- **Dynamic Configuration**: Runtime configuration updates + +## Basic Usage + +### Creating a Pool + +```v ignore +import db.mysql +import pool +import time + +// Define your connection factory function +fn create_conn() !&pool.ConnectionPoolable { + config := mysql.Config{ + host: '127.0.0.1' + port: 3306 + username: 'root' + password: '12345678' + dbname: 'mysql' + } + db := mysql.connect(config)! + return &db +} + +// Configure pool parameters +config := pool.ConnectionPoolConfig{ + max_conns: 50 + min_idle_conns: 5 + max_lifetime: 2 * time.hour + idle_timeout: 30 * time.minute + get_timeout: 5 * time.second +} + +// Create connection pool +mut my_pool := pool.new_connection_pool(create_conn, config)! + +// Acquire connection +mut conn := my_pool.get()! + +// Use connection +// ... your operations ... + +// Return connection to pool +my_pool.put(conn)! + +// When application exits +my_pool.close() +``` + +## Configuration Options + +| Parameter | Default Value | Description | +|---------------------|-----------------------|--------------------------------------| +| `max_conns` | 20 | Maximum connections in pool | +| `min_idle_conns` | 5 | Minimum idle connections to maintain | +| `max_lifetime` | 1 hour | Max connection lifetime | +| `idle_timeout` | 30 minutes | Idle connection timeout | +| `get_timeout` | 5 seconds | Connection acquisition timeout | +| `retry_base_delay` | 1 second | Base delay for connection retries | +| `max_retry_delay` | 30 seconds | Maximum retry delay | +| `max_retry_attempts`| 5 | Maximum connection creation attempts | + +## Advanced Features + +### Dynamic Configuration Update + +```v ignore +new_config := pool.ConnectionPoolConfig{ + max_conns: 100 + min_idle_conns: 10 + // ... other parameters ... +} + +my_pool.update_config(new_config)! +``` + +### Connection Recovery Signal + +```v ignore +// After connection maintenance/outage +my_pool.signal_recovery_event() +``` + +### Statistics Monitoring + +```v ignore +stats := my_pool.stats() +println("Active connections: ${stats.active_conns}") +println("Idle connections: ${stats.idle_conns}") +println("Waiting clients: ${stats.waiting_clients}") +``` + +## Implementation Notes + +1. **Exponential Backoff**: Connection creation uses exponential backoff with jitter +2. **Priority Eviction**: Four priority levels for connection cleanup: + - `low`: Routine maintenance + - `medium`: Connection acquisition failure + - `high`: Configuration changes + - `urgent`: Connection recovery events +3. **Adaptive Cleanup**: Maintenance thread dynamically adjusts processing frequency +4. **Wait Queue**: Fair connection allocation to waiting clients +5. **Atomic Operations**: Non-blocking statistics tracking + +## Performance Considerations + +- Use appropriate `min_idle_conns` to balance startup time and memory +- Set `max_lifetime` according to your backend connection limits +- Monitor `creation_errors` statistic to detect connection issues +- Use `evicted_count` to identify connection health problems + +## Example Implementation + +```v +// ConnectionPoolable connection interface implementation +struct MyConnection { + // Your connection state +} + +fn (mut c MyConnection) validate() !bool { + // Connection health check logic + return true +} + +fn (mut c MyConnection) close() ! { + // Physical close logic +} + +fn (mut c MyConnection) reset() ! { + // Reset connection to initial state +} +``` diff --git a/vlib/pool/connection.v b/vlib/pool/connection.v new file mode 100644 index 0000000000..f483146d9f --- /dev/null +++ b/vlib/pool/connection.v @@ -0,0 +1,742 @@ +module pool + +import sync +import sync.stdatomic { new_atomic } +import time + +// Eviction channel capacity +const eviction_ch_cap = 1000 + +// ConnectionPoolable defines the interface for connection objects +pub interface ConnectionPoolable { +mut: + // validate checks if the connection is still usable + validate() !bool + // close terminates the physical connection + close() ! + // reset returns the connection to initial state for reuse + reset() ! +} + +// ConnectionPoolConfig holds configuration settings for the connection pool +@[params] +pub struct ConnectionPoolConfig { +pub mut: + max_conns int = 20 // Maximum allowed connections + min_idle_conns int = 5 // Minimum idle connections to maintain + max_lifetime time.Duration = time.hour // Max lifetime of a connection + idle_timeout time.Duration = 30 * time.minute // Time before idle connections are cleaned up + get_timeout time.Duration = 5 * time.second // Max time to wait for a connection + retry_base_delay time.Duration = 1 * time.second // Base delay for retry backoff + max_retry_delay time.Duration = 30 * time.second // Maximum delay for retry backoff + max_retry_attempts int = 5 // Maximum retry attempts +} + +// ConnectionWrapper contains metadata about a pooled connection +struct ConnectionWrapper { +mut: + conn &ConnectionPoolable // The actual connection object + created_at time.Time // When connection was created + last_used_at time.Time // Last time connection was used + last_valid_at time.Time // Last time connection was validated + usage_count int // How many times this connection has been used +} + +// EvictionPriority indicates urgency of connection cleanup +pub enum EvictionPriority { + low // Routine cleanup (connection return) + medium // Connection get failure + high // Configuration change + urgent // Database/Server recovery +} + +// ConnectionPool manages a pool of reusable connections +pub struct ConnectionPool { +mut: + config ConnectionPoolConfig + // Lock order: + // config_mutex > create_mutex > idle_pool_mutex > all_conns_mutex > wait_queue_mutex + config_mutex &sync.RwMutex @[required] // Guards configuration changes + create_mutex &sync.Mutex // Serializes connection creation + idle_pool_mutex &sync.RwMutex @[required] // Protects idle connections + all_conns_mutex &sync.RwMutex @[required] // Protects all connections map + wait_queue_mutex &sync.RwMutex @[required] // Protects wait queue + is_closed &stdatomic.AtomicVal[bool] @[required] // Pool shutdown flag + stop_ch chan bool // Signals maintenance thread to stop + eviction_ch chan EvictionPriority // Eviction event channel + cleanup_thread thread // Background maintenance thread + wait_queue []chan bool // Client wait queue for connection acquisition + conn_factory fn () !&ConnectionPoolable @[required] // Creates new connections + active_count &stdatomic.AtomicVal[int] @[required] // Currently checked-out connections + created_at time.Time // Pool creation timestamp + all_conns map[voidptr]&ConnectionWrapper // All tracked connections + idle_pool []&ConnectionWrapper // Currently idle connections + creation_errors &stdatomic.AtomicVal[int] @[required] // Failed creation attempts + evicted_count &stdatomic.AtomicVal[int] @[required] // Connections forcibly removed + creating_count &stdatomic.AtomicVal[int] @[required] // Connections being created +} + +// new_connection_pool creates a new connection pool +pub fn new_connection_pool(conn_factory fn () !&ConnectionPoolable, config ConnectionPoolConfig) !&ConnectionPool { + // Validate configuration parameters + check_config(config)! + + mut p := &ConnectionPool{ + conn_factory: conn_factory + config: config + config_mutex: sync.new_rwmutex() + create_mutex: sync.new_mutex() + idle_pool_mutex: sync.new_rwmutex() + all_conns_mutex: sync.new_rwmutex() + wait_queue_mutex: sync.new_rwmutex() + is_closed: new_atomic(false) + stop_ch: chan bool{cap: 1} + eviction_ch: chan EvictionPriority{cap: eviction_ch_cap} + active_count: new_atomic(0) + creation_errors: new_atomic(0) + evicted_count: new_atomic(0) + creating_count: new_atomic(0) + all_conns: map[voidptr]&ConnectionWrapper{} + } + + now := time.utc() + p.created_at = now + + // Initialize minimum idle connections + for _ in 0 .. config.min_idle_conns { + conn := p.create_conn_with_retry() or { + // Cleanup if initialization fails + p.all_conns_mutex.lock() + for _, mut wrapper in p.all_conns { + wrapper.conn.close() or {} + } + p.all_conns.clear() + p.all_conns_mutex.unlock() + return err + } + wrapper := &ConnectionWrapper{ + conn: conn + created_at: now + last_used_at: now + last_valid_at: now + } + p.idle_pool << wrapper + p.all_conns_mutex.lock() + p.all_conns[conn] = wrapper + p.all_conns_mutex.unlock() + } + + // Start background maintenance thread + p.cleanup_thread = spawn p.background_maintenance() + // Initial connection pruning + p.prune_connections() + return p +} + +// create_conn_with_retry creates a connection with exponential backoff retries +fn (mut p ConnectionPool) create_conn_with_retry() !&ConnectionPoolable { + // Get current configuration + p.config_mutex.rlock() + max_attempts := p.config.max_retry_attempts + base_delay := p.config.retry_base_delay + max_delay := p.config.max_retry_delay + p.config_mutex.unlock() + + // Serialize connection creation + p.create_mutex.lock() + defer { + p.create_mutex.unlock() + } + + mut attempt := 0 + p.creating_count.add(1) + defer { + p.creating_count.sub(1) + } + + for { + mut conn := p.conn_factory() or { + // Handle creation error with exponential backoff + if attempt >= max_attempts { + return error('Connection creation failed after ${attempt} attempts: ${err}') + } + + // Calculate next delay with exponential backoff + mut delay := base_delay * time.Duration(1 << attempt) + if delay > max_delay { + delay = max_delay + } + + time.sleep(delay) + attempt++ + p.creation_errors.add(1) + continue + } + + // Validate new connection + if !conn.validate() or { false } { + conn.close() or {} + return error('New connection validation failed') + } + return conn + } + return error('Unreachable code') +} + +// try_wakeup_waiters attempts to notify waiting clients of available resources +fn (mut p ConnectionPool) try_wakeup_waiters() { + can_create := p.can_create() + p.wait_queue_mutex.lock() + defer { + p.wait_queue_mutex.unlock() + } + + // Notify first client if resources are available + if p.wait_queue.len > 0 { + if p.idle_pool.len > 0 || can_create { + to_wake := p.wait_queue[0] + p.wait_queue.delete(0) + to_wake <- true + } + } +} + +// can_create checks if new connections can be created +@[inline] +fn (mut p ConnectionPool) can_create() bool { + p.config_mutex.rlock() + max_conns := p.config.max_conns + p.config_mutex.unlock() + return p.active_count.load() + p.creating_count.load() < max_conns && !p.is_closed.load() + && p.all_conns.len < max_conns +} + +// get acquires a connection from the pool with timeout +pub fn (mut p ConnectionPool) get() !&ConnectionPoolable { + start_time := time.utc() + for { + // Check if pool is closed + if p.is_closed.load() { + return error('Connection pool closed') + } + + // Try immediate acquisition + if conn := p.try_get() { + p.eviction_ch <- .medium + return conn + } + + // Check if new connection can be created + can_create := p.can_create() + if can_create { + mut new_conn := p.create_conn_with_retry()! + + // Final check before adding to pool + if p.is_closed.load() { + new_conn.close()! + return error('Connection pool closed') + } + + p.config_mutex.rlock() + max_conns := p.config.max_conns + p.config_mutex.unlock() + + p.all_conns_mutex.lock() + if p.all_conns.len < max_conns { + // Successfully create and add new connection + now := time.utc() + wrapper := &ConnectionWrapper{ + conn: new_conn + created_at: now + last_used_at: now + last_valid_at: now + } + p.all_conns[new_conn] = wrapper + p.all_conns_mutex.unlock() + p.active_count.add(1) + return new_conn + } else { + // Connection limit reached - close new connection + p.all_conns_mutex.unlock() + new_conn.close()! + } + } + + // Second attempt to get connection + if conn := p.try_get() { + return conn + } + + // Calculate remaining time for connection acquisition + p.config_mutex.rlock() + timeout := p.config.get_timeout + p.config_mutex.unlock() + elapsed := time.utc() - start_time + if elapsed > timeout { + return error('Connection acquisition timeout') + } + remaining := timeout - elapsed + + // Set up notification channel + notify_chan := chan bool{cap: 1} + defer { + notify_chan.close() + } + + // Add to wait queue + p.wait_queue_mutex.lock() + p.wait_queue << notify_chan + p.wait_queue_mutex.unlock() + + select { + _ := <-notify_chan { + // Notification received - retry acquisition + } + i64(remaining) { + // Timeout cleanup + p.wait_queue_mutex.lock() + for i := 0; i < p.wait_queue.len; i++ { + if p.wait_queue[i] == notify_chan { + p.wait_queue.delete(i) + break + } + } + p.wait_queue_mutex.unlock() + if conn := p.try_get() { + return conn + } + return error('Connection acquisition timeout') + } + } + } + + return error('Unreachable code') +} + +// try_get attempts non-blocking connection acquisition +fn (mut p ConnectionPool) try_get() ?&ConnectionPoolable { + // Get relevant configuration parameters + p.config_mutex.rlock() + min_idle := p.config.min_idle_conns + max_lifetime := p.config.max_lifetime + p.config_mutex.unlock() + + p.idle_pool_mutex.lock() + defer { + p.idle_pool_mutex.unlock() + } + + // Determine eviction priority based on idle count + priority := if p.idle_pool.len <= min_idle { + EvictionPriority.urgent + } else if p.idle_pool.len > min_idle * 2 { + EvictionPriority.low + } else { + EvictionPriority.medium + } + p.eviction_ch <- priority + + // Process idle connections + for p.idle_pool.len > 0 { + mut wrapper := p.idle_pool.pop() + + // Check connection lifetime + age := time.utc() - wrapper.created_at + if age > max_lifetime { + // Close expired connection + p.all_conns_mutex.lock() + p.all_conns.delete(wrapper.conn) + p.all_conns_mutex.unlock() + wrapper.conn.close() or {} + continue + } + + // Validate connection + if !wrapper.conn.validate() or { false } { + // Handle invalid connection + p.all_conns_mutex.lock() + p.all_conns.delete(wrapper.conn) + p.all_conns_mutex.unlock() + wrapper.conn.close() or {} + continue + } + + wrapper.last_valid_at = time.utc() + + // Mark connection as active + p.active_count.add(1) + wrapper.last_used_at = time.utc() + wrapper.usage_count++ + return wrapper.conn + } + return none +} + +// put returns a connection to the pool +pub fn (mut p ConnectionPool) put(conn &ConnectionPoolable) ! { + if p.active_count.load() > 0 { + // TODO: may need a atomic check here, compare_exchange? + p.active_count.sub(1) + } + + mut conn_ptr := unsafe { conn } + // Handle closed pool case + if p.is_closed.load() { + conn_ptr.close()! + return + } + + // Reset connection to initial state + conn_ptr.reset() or { + conn_ptr.close() or {} + p.all_conns_mutex.lock() + p.all_conns.delete(conn) + p.all_conns_mutex.unlock() + return err + } + + p.idle_pool_mutex.lock() + p.all_conns_mutex.lock() + defer { + p.all_conns_mutex.unlock() + p.idle_pool_mutex.unlock() + } + + // Return connection to idle pool + if mut wrapper := p.all_conns[conn] { + wrapper.last_used_at = time.utc() + p.idle_pool << wrapper + + // Determine if eviction is needed + p.config_mutex.rlock() + low_eviction := p.idle_pool.len > p.config.min_idle_conns + p.config_mutex.unlock() + + // Wake any waiting clients + p.try_wakeup_waiters() + + // Trigger eviction if needed + priority := if low_eviction { EvictionPriority.low } else { EvictionPriority.urgent } + p.eviction_ch <- priority + } else { + // Handle unmanaged connection + conn_ptr.close()! + return error('Unmanaged connection') + } +} + +// close shuts down the connection pool and cleans up resources +pub fn (mut p ConnectionPool) close() { + if p.is_closed.load() { + return + } + p.is_closed.store(true) + + // Signal background thread to stop + p.stop_ch <- true + p.cleanup_thread.wait() + p.stop_ch.close() + + // Close all active connections + p.idle_pool_mutex.lock() + p.all_conns_mutex.lock() + for _, mut wrapper in p.all_conns { + wrapper.conn.close() or {} + } + p.all_conns.clear() + p.idle_pool.clear() + p.all_conns_mutex.unlock() + p.idle_pool_mutex.unlock() + + // Process clients in the wait queue + p.wait_queue_mutex.lock() + waiters := p.wait_queue.clone() + p.wait_queue.clear() + p.wait_queue_mutex.unlock() + + for ch in waiters { + ch <- true // Notify all waiting clients + } + + p.eviction_ch.close() + + // Reset all counters + p.active_count.store(0) + p.creation_errors.store(0) + p.evicted_count.store(0) + p.creating_count.store(0) +} + +// background_maintenance handles periodic connection cleanup +fn (mut p ConnectionPool) background_maintenance() { + mut first_trigger_time := u64(0) + mut event_count := 0 + mut min_interval := time.infinite // Dynamic processing interval + + for { + // Calculate adaptive processing interval + p.config_mutex.rlock() + dynamic_interval := if p.config.idle_timeout / 10 > time.second { + time.second + } else { + p.config.idle_timeout / 10 + } + p.config_mutex.unlock() + + interval := if min_interval < dynamic_interval { + min_interval + } else { + dynamic_interval + } + + select { + _ := <-p.stop_ch { + // Termination signal received + return + } + priority := <-p.eviction_ch { + // Process event based on priority + match priority { + .low { + event_count++ + min_interval = 100 * time.millisecond + } + .medium { + event_count += 10 + min_interval = 10 * time.millisecond + } + .high { + event_count += 50 + min_interval = 1 * time.millisecond + } + .urgent { + event_count += 1000 + min_interval = 100 * time.microsecond + } + } + + // Track first event time + if first_trigger_time == 0 { + first_trigger_time = time.sys_mono_now() + } + + elapsed := time.sys_mono_now() - first_trigger_time + + // Determine if immediate processing is needed + if priority == .urgent + || (priority == .high && elapsed > 100 * time.microsecond) + || (priority == .medium && elapsed > 1 * time.millisecond) + || (priority == .low && elapsed > 10 * time.millisecond) + || event_count >= 1000 { + p.prune_connections() + event_count = 0 + first_trigger_time = 0 + min_interval = time.infinite + } + } + i64(interval) { + // Periodic maintenance + if event_count > 0 || interval == min_interval { + p.prune_connections() + event_count = 0 + first_trigger_time = 0 + min_interval = time.infinite + } + } + } + } +} + +// prune_connections removes invalid connections and maintains min idle count +fn (mut p ConnectionPool) prune_connections() { + // Get current configuration parameters + p.config_mutex.rlock() + max_lifetime := p.config.max_lifetime + idle_timeout := p.config.idle_timeout + min_idle := p.config.min_idle_conns + p.config_mutex.unlock() + + p.idle_pool_mutex.lock() + // Remove stale connections + for i := p.idle_pool.len - 1; i >= 0; i-- { + mut wrapper := p.idle_pool[i] + age := time.utc() - wrapper.created_at + idle_time := time.utc() - wrapper.last_used_at + + if age > max_lifetime || idle_time > idle_timeout || !wrapper.conn.validate() or { false } { + p.all_conns_mutex.lock() + p.all_conns.delete(wrapper.conn) + p.all_conns_mutex.unlock() + wrapper.conn.close() or {} + p.idle_pool.delete(i) + p.evicted_count.add(1) + } else { + wrapper.last_valid_at = time.utc() + } + } + current_idle := p.idle_pool.len + p.idle_pool_mutex.unlock() + + // Calculate connections to create + to_create := if min_idle > current_idle { min_idle - current_idle } else { 0 } + + // Create needed connections + mut new_conns := []&ConnectionPoolable{} + if to_create > 0 { + for _ in 0 .. to_create { + if new_conn := p.create_conn_with_retry() { + new_conns << new_conn + } + } + } + + // Check if pool was closed during creation + if p.is_closed.load() { + for mut new_conn in new_conns { + new_conn.close() or {} + } + return + } + + p.config_mutex.rlock() + current_min_idle := p.config.min_idle_conns + max_conns := p.config.max_conns + p.config_mutex.unlock() + + // Add new connections to the pool + p.idle_pool_mutex.lock() + p.all_conns_mutex.lock() + defer { + p.all_conns_mutex.unlock() + p.idle_pool_mutex.unlock() + } + + actual_needed := if current_min_idle > p.idle_pool.len { + current_min_idle - p.idle_pool.len + } else { + 0 + } + available_slots := max_conns - p.all_conns.len + mut actual_to_add := if actual_needed > new_conns.len { new_conns.len } else { actual_needed } + actual_to_add = if actual_to_add > available_slots { available_slots } else { actual_to_add } + + // Create wrapper for each new connection + for i in 0 .. actual_to_add { + now := time.utc() + wrapper := &ConnectionWrapper{ + conn: new_conns[i] + created_at: now + last_used_at: now + last_valid_at: now + } + p.idle_pool << wrapper + p.all_conns[new_conns[i]] = wrapper + } + + // Close any extra connections + for i in actual_to_add .. new_conns.len { + new_conns[i].close() or {} + } + + // Wake clients if connections were added + if actual_to_add > 0 { + p.try_wakeup_waiters() + } +} + +fn check_config(config ConnectionPoolConfig) ! { + if config.max_conns <= 0 { + return error('max_conns must be positive') + } + if config.min_idle_conns < 0 { + return error('min_idle_conns cannot be negative') + } + if config.min_idle_conns > config.max_conns { + return error('min_idle_conns cannot exceed max_conns') + } + if config.max_lifetime < 0 { + return error('max_lifetime cannot be negative') + } + if config.idle_timeout < 0 { + return error('idle_timeout cannot be negative') + } + if config.idle_timeout > config.max_lifetime { + return error('idle_timeout cannot exceed max_lifetime') + } + if config.get_timeout < 0 { + return error('get_timeout cannot be negative') + } + if config.retry_base_delay < 0 { + return error('retry_base_delay cannot be negative') + } + if config.max_retry_delay < 0 { + return error('max_retry_delay cannot be negative') + } + if config.max_retry_attempts < 0 { + return error('max_retry_attempts cannot be negative') + } +} + +// update_config changes the connection pool configuration +pub fn (mut p ConnectionPool) update_config(config ConnectionPoolConfig) ! { + // Validate configuration + check_config(config)! + // Check pool status + if p.is_closed.load() { + return error('Connection pool is closed') + } + + // Update configuration + p.config_mutex.lock() + p.config = config + p.config_mutex.unlock() + + // Trigger maintenance + p.eviction_ch <- .high +} + +// signal_recovery_event notifies the pool of recovery event +pub fn (mut p ConnectionPool) signal_recovery_event() { + p.eviction_ch <- .urgent +} + +// send_eviction triggers a cleanup event +pub fn (mut p ConnectionPool) send_eviction(priority EvictionPriority) { + p.eviction_ch <- priority +} + +// ConnectionPoolStats holds statistics about the pool +pub struct ConnectionPoolStats { +pub: + total_conns int // All managed connections + active_conns int // Currently checked-out connections + idle_conns int // Available connections + waiting_clients int // Clients waiting for a connection + evicted_count int // Connections forcibly removed + creation_errors int // Failed creation attempts + created_at time.Time // When pool was created + creating_count int // Connections being created +} + +// stats retrieves current connection pool statistics +pub fn (mut p ConnectionPool) stats() ConnectionPoolStats { + p.idle_pool_mutex.rlock() + p.all_conns_mutex.rlock() + p.wait_queue_mutex.rlock() + defer { + p.wait_queue_mutex.unlock() + p.all_conns_mutex.unlock() + p.idle_pool_mutex.unlock() + } + + return ConnectionPoolStats{ + total_conns: p.all_conns.len + active_conns: p.active_count.load() + idle_conns: p.idle_pool.len + waiting_clients: p.wait_queue.len + evicted_count: p.evicted_count.load() + creation_errors: p.creation_errors.load() + created_at: p.created_at + creating_count: p.creating_count.load() + } +} diff --git a/vlib/pool/connection_test.v b/vlib/pool/connection_test.v new file mode 100644 index 0000000000..44e79dd5c2 --- /dev/null +++ b/vlib/pool/connection_test.v @@ -0,0 +1,292 @@ +// vtest build: !msvc // msvc hung, maybe sync/atomic bug +import time +import sync +import pool +import rand + +// Mock connection implementation +struct MockConn { +mut: + healthy bool + close_flag bool + reset_flag bool + closed int + id string +} + +fn (mut c MockConn) validate() !bool { + return c.healthy +} + +fn (mut c MockConn) close() ! { + if c.close_flag { + return error('simulated close error') + } + c.closed++ +} + +fn (mut c MockConn) reset() ! { + if c.reset_flag { + return error('simulated reset error') + } + c.reset_flag = true +} + +// Test utility functions +fn create_mock_factory(mut arr []&pool.ConnectionPoolable, healthy bool, fail_times int) fn () !&pool.ConnectionPoolable { + mut count := 0 + return fn [mut arr, healthy, fail_times, mut count] () !&pool.ConnectionPoolable { + if count < fail_times { + count++ + return error('connection creation failed') + } + mut conn := &MockConn{ + healthy: healthy + id: rand.uuid_v7() + } + arr << conn + return conn + } +} + +fn is_same_conn(conn1 &pool.ConnectionPoolable, conn2 &pool.ConnectionPoolable) bool { + c1 := conn1 as MockConn + c2 := conn2 as MockConn + return c1.id == c2.id +} + +// Test cases +fn test_basic_usage() { + for _ in 0 .. 1 { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + config := pool.ConnectionPoolConfig{ + max_conns: 5 + min_idle_conns: 2 + idle_timeout: 100 * time.millisecond + get_timeout: 50 * time.millisecond + } + + mut p := pool.new_connection_pool(factory, config)! + + // Acquire a connection + mut conn1 := p.get()! + assert p.stats().active_conns == 1 + + // Acquire multiple connections + mut conns := [p.get()!, p.get()!, p.get()!] + assert p.stats().active_conns == 4 + + // Return connections + for c in conns { + p.put(c)! + } + p.put(conn1)! + assert p.stats().active_conns == 0 + assert p.stats().total_conns >= 4 + p.close() + } +} + +fn test_pool_exhaustion() { + for i in 0 .. 1 { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + config := pool.ConnectionPoolConfig{ + max_conns: 2 + min_idle_conns: 1 + get_timeout: 10 * time.millisecond + } + + mut p := pool.new_connection_pool(factory, config)! + + // Acquire all connections + c1 := p.get()! + c2 := p.get()! + assert p.stats().active_conns == 2 + + // Attempt to acquire third connection (should timeout) + p.get() or { assert err.msg().contains('timeout') } + + // After returning, should be able to acquire again + p.put(c2)! + c3 := p.get()! + assert is_same_conn(c3, c2) + assert p.stats().active_conns == 2 + p.close() + } +} + +fn test_connection_validation() { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, false, 0) // Create unhealthy connections + config := pool.ConnectionPoolConfig{ + min_idle_conns: 1 + } + + mut p := pool.new_connection_pool(factory, config) or { + assert err.msg().contains('connection validation failed') + return + } + defer { + p.close() + } +} + +fn test_eviction() { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + config := pool.ConnectionPoolConfig{ + max_lifetime: 10 * time.millisecond + idle_timeout: 10 * time.millisecond + min_idle_conns: 0 + } + + mut p := pool.new_connection_pool(factory, config)! + defer { + p.close() + } + + // Acquire and return a connection + c1 := p.get()! + p.put(c1)! + + // Wait longer than timeout thresholds + time.sleep(100 * time.millisecond) + p.send_eviction(.urgent) + time.sleep(10 * time.millisecond) + stats := p.stats() + assert stats.evicted_count > 0 + assert stats.total_conns == 0 + assert stats.idle_conns == 0 + c2 := p.get()! + assert !is_same_conn(c1, c2) + assert p.stats().total_conns == 1 + assert p.stats().evicted_count > 0 +} + +fn test_retry_mechanism() { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 3) // First 3 attempts fail + config := pool.ConnectionPoolConfig{ + max_retry_attempts: 5 + retry_base_delay: 10 * time.millisecond + min_idle_conns: 0 // No idle connections + } + + mut p := pool.new_connection_pool(factory, config)! + defer { + p.close() + } + // Should successfully create connection after retries + conn := p.get()! + assert test_conns.len == 1 + assert p.stats().creation_errors == 3 +} + +fn test_concurrent_access() { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + config := pool.ConnectionPoolConfig{ + max_conns: 10 + } + + mut p := pool.new_connection_pool(factory, config)! + defer { + p.close() + } + mut wg := &sync.WaitGroup{} + + for _ in 0 .. 20 { + wg.add(1) + spawn fn (mut p pool.ConnectionPool, mut wg sync.WaitGroup) ! { + defer { wg.done() } + conn := p.get()! + time.sleep(5 * time.millisecond) + p.put(conn)! + }(mut p, mut wg) + } + + wg.wait() + stats := p.stats() + assert stats.total_conns <= 10 + assert stats.active_conns == 0 +} + +fn test_pool_close() { + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + + mut p := pool.new_connection_pool(factory, pool.ConnectionPoolConfig{})! + c := p.get()! + p.put(c)! + assert p.stats().active_conns == 0 + assert p.stats().idle_conns == 5 + + p.close() + + // Attempt to acquire connection after close + p.get() or { assert err.msg().contains('closed') } + assert p.stats().active_conns == 0 + assert p.stats().idle_conns == 0 + + // Verify connection was closed + mock_conn := test_conns[0] as MockConn + assert mock_conn.closed == 1 +} + +fn test_config_update() { + mut dummy := []&pool.ConnectionPoolable{} + mut p := pool.new_connection_pool(create_mock_factory(mut dummy, true, 0), pool.ConnectionPoolConfig{ + max_conns: 2 + min_idle_conns: 1 + })! + defer { + p.close() + } + assert p.stats().idle_conns == 1 + + // Modify configuration + new_config := pool.ConnectionPoolConfig{ + max_conns: 5 + min_idle_conns: 3 + } + + p.update_config(new_config)! + + // Trigger idle connection replenishment + time.sleep(100 * time.millisecond) + assert p.stats().idle_conns >= 3 +} + +fn test_error_handling() { + // Test close error handling + mut test_conns := []&pool.ConnectionPoolable{} + factory := create_mock_factory(mut test_conns, true, 0) + mut p := pool.new_connection_pool(factory, pool.ConnectionPoolConfig{})! + defer { + p.close() + } + + // default configuration has 5 idle_conns + assert p.stats().idle_conns == 5 + + // Bug Fix Needed! msvc generated wrong code for `mut conn := p.get()! as MockConn` + mut connx := p.get()! + mut conn := connx as MockConn + assert p.stats().active_conns == 1 + // it depend on `background_maintenance` thread to keep 5 idle_conns + assert p.stats().idle_conns >= 4 + conn.close_flag = true + p.put(conn) or { assert err.msg().contains('simulated close error') } + assert p.stats().active_conns == 0 + // it depend on `background_maintenance` thread to keep 5 idle_conns + assert p.stats().idle_conns >= 4 + + // Test reset error handling + conn.reset_flag = true + p.put(conn) or { assert err.msg().contains('reset') } + assert p.stats().active_conns == 0 + // it depend on `background_maintenance` thread to keep 5 idle_conns + assert p.stats().idle_conns >= 4 +}