From af947f1af5e87f9d3214b25a73001b2355241914 Mon Sep 17 00:00:00 2001 From: kbkpbot Date: Tue, 29 Apr 2025 14:10:13 +0800 Subject: [PATCH] db: mysql,pg,sqlite add transaction support (fix #24290) (#24352) --- vlib/db/mysql/README.md | 62 +++++++++++++++++++++------ vlib/db/mysql/_cdefs.c.v | 3 ++ vlib/db/mysql/mysql.c.v | 81 ++++++++++++++++++++++++++++++++++-- vlib/db/mysql/mysql_test.v | 45 +++++++++++++++++++- vlib/db/pg/pg.c.v | 74 ++++++++++++++++++++++++++++++++ vlib/db/pg/pg_test.v | 40 +++++++++++++++++- vlib/db/sqlite/sqlite.c.v | 50 +++++++++++++++++++++- vlib/db/sqlite/sqlite_test.v | 31 ++++++++++---- 8 files changed, 358 insertions(+), 28 deletions(-) diff --git a/vlib/db/mysql/README.md b/vlib/db/mysql/README.md index ec26b4ade7..e43f52157a 100644 --- a/vlib/db/mysql/README.md +++ b/vlib/db/mysql/README.md @@ -41,23 +41,59 @@ more user friendly errors for that situation. import db.mysql // Create connection -mut connection := mysql.Connection{ +config := mysql.Config{ + host: '127.0.0.1' + port: 3306 username: 'root' + password: '12345678' dbname: 'mysql' } + // Connect to server -connection.connect()? -// Change the default database -connection.select_db('db_users')? +mut db := mysql.connect(config)! // Do a query -get_users_query_result := connection.query('SELECT * FROM users')? -// Get the result as maps -for user in get_users_query_result.maps() { - // Access the name of user - println(user['name']) +res := db.query('select * from users')! +rows := res.rows() +for row in rows { + println(row.vals) } -// Free the query result -get_users_query_result.free() // Close the connection if needed -connection.close() -``` \ No newline at end of file +db.close() +``` + +## Transaction + +```v oksyntax +import db.mysql + +// Create connection +config := mysql.Config{ + host: '127.0.0.1' + port: 3306 + username: 'root' + password: '12345678' + dbname: 'mysql' +} + +mut db := mysql.connect(config)! +// turn off `autocommit` first +db.autocommit(false)! +// begin a new transaction +db.begin()! +mut result_code := db.exec_none('insert into users (username) values ("tom")') +assert result_code == 0 +// make a savepoint +db.savepoint('savepoint1')! +result_code = db.exec_none('insert into users (username) values ("kitty")') +assert result_code == 0 +// rollback to `savepoint1` +db.rollback_to('savepoint1')! +result_code = db.exec_none('insert into users (username) values ("mars")') +assert result_code == 0 +db.commit()! +res := db.query('select * from users')! +rows := res.rows() +dump(rows) +// Close the connection if needed +db.close() +``` diff --git a/vlib/db/mysql/_cdefs.c.v b/vlib/db/mysql/_cdefs.c.v index 7cd3676443..36841a6059 100644 --- a/vlib/db/mysql/_cdefs.c.v +++ b/vlib/db/mysql/_cdefs.c.v @@ -86,6 +86,9 @@ fn C.mysql_autocommit(mysql &C.MYSQL, mode bool) int // C.mysql_commit commits the current transaction. fn C.mysql_commit(mysql &C.MYSQL) int +// C.mysql_rollback rollback the current transaction. +fn C.mysql_rollback(mysql &C.MYSQL) int + // C.mysql_refresh flush tables or caches, or resets replication server information. fn C.mysql_refresh(mysql &C.MYSQL, options u32) int diff --git a/vlib/db/mysql/mysql.c.v b/vlib/db/mysql/mysql.c.v index b05626956d..31d19a54b3 100644 --- a/vlib/db/mysql/mysql.c.v +++ b/vlib/db/mysql/mysql.c.v @@ -38,6 +38,13 @@ pub enum ConnectionFlag { client_remember_options // (1 << 31) Don't reset the options after an unsuccessful connect } +pub enum MySQLTransactionLevel { + read_uncommitted + read_committed + repeatable_read + serializable +} + struct SQLError { MessageError } @@ -188,7 +195,7 @@ pub fn (mut db DB) autocommit(mode bool) ! { } // commit commits the current transaction. -pub fn (db &DB) commit() ! { +pub fn (mut db DB) commit() ! { db.check_connection_is_established()! result := C.mysql_commit(db.conn) @@ -197,6 +204,71 @@ pub fn (db &DB) commit() ! { } } +@[params] +pub struct MySQLTransactionParam { + transaction_level MySQLTransactionLevel = .repeatable_read +} + +// begin begins a new transaction. +pub fn (mut db DB) begin(param MySQLTransactionParam) ! { + db.check_connection_is_established()! + db.set_transaction_level(param.transaction_level)! + result := db.exec_none('START TRANSACTION') + if result != 0 { + db.throw_mysql_error()! + } +} + +// set_transaction_level set level for the transaction +pub fn (mut db DB) set_transaction_level(level MySQLTransactionLevel) ! { + db.check_connection_is_established()! + mut sql_stmt := 'SET TRANSACTION ISOLATION LEVEL ' + match level { + .read_uncommitted { sql_stmt += 'READ UNCOMMITTED' } + .read_committed { sql_stmt += 'READ COMMITTED' } + .repeatable_read { sql_stmt += 'REPEATABLE READ' } + .serializable { sql_stmt += 'SERIALIZABLE' } + } + result := db.exec_none(sql_stmt) + if result != 0 { + db.throw_mysql_error()! + } +} + +// rollback rollbacks the current transaction. +pub fn (mut db DB) rollback() ! { + db.check_connection_is_established()! + result := C.mysql_rollback(db.conn) + + if result != 0 { + db.throw_mysql_error()! + } +} + +// rollback_to rollbacks to a specified savepoint. +pub fn (mut db DB) rollback_to(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + db.check_connection_is_established()! + result := db.exec_none('ROLLBACK TO SAVEPOINT ${savepoint}') + if result != 0 { + db.throw_mysql_error()! + } +} + +// savepoint create a new savepoint. +pub fn (mut db DB) savepoint(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + db.check_connection_is_established()! + result := db.exec_none('SAVEPOINT ${savepoint}') + if result != 0 { + db.throw_mysql_error()! + } +} + // tables returns a list of the names of the tables in the current database, // that match the simple regular expression specified by the `wildcard` parameter. // The `wildcard` parameter may contain the wildcard characters `%` or `_`. @@ -500,8 +572,11 @@ pub fn (stmt &StmtHandle) execute(params []string) ![]Row { binds[i].buffer = data binds[i].buffer_length = l code = C.mysql_stmt_fetch_column(stmt.stmt, unsafe { &binds[i] }, i, 0) - - row.vals << unsafe { data.vstring() } + if *(binds[i].is_null) { + row.vals << '' + } else { + row.vals << unsafe { data.vstring() } + } } rows << row } diff --git a/vlib/db/mysql/mysql_test.v b/vlib/db/mysql/mysql_test.v index b4415b0753..6869f9db76 100644 --- a/vlib/db/mysql/mysql_test.v +++ b/vlib/db/mysql/mysql_test.v @@ -11,11 +11,11 @@ fn test_mysql() { host: '127.0.0.1' port: 3306 username: 'root' - password: '' + password: '12345678' dbname: 'mysql' } - db := mysql.connect(config)! + mut db := mysql.connect(config)! mut response := db.exec('drop table if exists users')! assert response == []mysql.Row{} @@ -87,4 +87,45 @@ fn test_mysql() { assert response[0] == mysql.Row{ vals: ['4', 'blaze', ''] } + + // transaction test + // turn off `autocommit` first + db.autocommit(false)! + // begin a new transaction + db.begin()! + result_code = db.exec_none('insert into users (username) values ("tom")') + assert result_code == 0 + // make a savepoint + db.savepoint('savepoint1')! + result_code = db.exec_none('insert into users (username) values ("kitty")') + assert result_code == 0 + // rollback to `savepoint1` + db.rollback_to('savepoint1')! + result_code = db.exec_none('insert into users (username) values ("mars")') + assert result_code == 0 + db.commit()! + response = db.exec_param_many('select * from users', [''])! + assert response == [ + mysql.Row{ + vals: ['1', 'jackson', ''] + }, + mysql.Row{ + vals: ['2', 'shannon', ''] + }, + mysql.Row{ + vals: ['3', 'bailey', ''] + }, + mysql.Row{ + vals: ['4', 'blaze', ''] + }, + mysql.Row{ + vals: ['5', 'Hi', ''] + }, + mysql.Row{ + vals: ['6', 'tom', ''] + }, + mysql.Row{ + vals: ['8', 'mars', ''] + }, + ] } diff --git a/vlib/db/pg/pg.c.v b/vlib/db/pg/pg.c.v index e183e54e64..9ed89d5ac3 100644 --- a/vlib/db/pg/pg.c.v +++ b/vlib/db/pg/pg.c.v @@ -124,6 +124,8 @@ fn C.PQconnectdb(const_conninfo &char) &C.PGconn fn C.PQstatus(const_conn &C.PGconn) int +fn C.PQtransactionStatus(const_conn &C.PGconn) int + fn C.PQerrorMessage(const_conn &C.PGconn) &char fn C.PQexec(res &C.PGconn, const_query &char) &C.PGresult @@ -436,3 +438,75 @@ fn pg_stmt_worker(db DB, query string, data orm.QueryData, where orm.QueryData) param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results return db.handle_error_or_result(res, 'orm_stmt_worker') } + +pub enum PQTransactionLevel { + read_uncommitted + read_committed + repeatable_read + serializable +} + +@[params] +pub struct PQTransactionParam { + transaction_level PQTransactionLevel = .repeatable_read +} + +// begin begins a new transaction. +pub fn (db DB) begin(param PQTransactionParam) ! { + mut sql_stmt := 'BEGIN TRANSACTION ISOLATION LEVEL ' + match param.transaction_level { + .read_uncommitted { sql_stmt += 'READ UNCOMMITTED' } + .read_committed { sql_stmt += 'READ COMMITTED' } + .repeatable_read { sql_stmt += 'REPEATABLE READ' } + .serializable { sql_stmt += 'SERIALIZABLE' } + } + _ := C.PQexec(db.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg exec error: "${e}"') + } +} + +// commit commits the current transaction. +pub fn (db DB) commit() ! { + _ := C.PQexec(db.conn, c'COMMIT;') + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg exec error: "${e}"') + } +} + +// rollback rollbacks the current transaction. +pub fn (db DB) rollback() ! { + _ := C.PQexec(db.conn, c'ROLLBACK;') + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg exec error: "${e}"') + } +} + +// rollback_to rollbacks to a specified savepoint. +pub fn (db DB) rollback_to(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + sql_stmt := 'ROLLBACK TO SAVEPOINT ${savepoint};' + _ := C.PQexec(db.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg exec error: "${e}"') + } +} + +// savepoint create a new savepoint. +pub fn (db DB) savepoint(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + sql_stmt := 'SAVEPOINT ${savepoint};' + _ := C.PQexec(db.conn, &char(sql_stmt.str)) + e := unsafe { C.PQerrorMessage(db.conn).vstring() } + if e != '' { + return error('pg exec error: "${e}"') + } +} diff --git a/vlib/db/pg/pg_test.v b/vlib/db/pg/pg_test.v index 081d47a4d7..6bfa0e3828 100644 --- a/vlib/db/pg/pg_test.v +++ b/vlib/db/pg/pg_test.v @@ -10,7 +10,7 @@ fn test_large_exec() { return } - db := pg.connect(pg.Config{ user: 'postgres', password: 'secret', dbname: 'postgres' })! + db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() } @@ -31,7 +31,12 @@ WHERE } fn test_prepared() { - db := pg.connect(pg.Config{ user: 'postgres', password: 'secret', dbname: 'postgres' })! + $if !network ? { + eprintln('> Skipping test ${@FN}, since `-d network` is not passed.') + eprintln('> This test requires a working postgres server running on localhost.') + return + } + db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! defer { db.close() } @@ -42,3 +47,34 @@ fn test_prepared() { assert result.len == 1 } + +fn test_transaction() { + $if !network ? { + eprintln('> Skipping test ${@FN}, since `-d network` is not passed.') + eprintln('> This test requires a working postgres server running on localhost.') + return + } + + db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })! + defer { + db.close() + } + db.exec('drop table if exists users')! + db.exec('create table if not exists users ( + id SERIAL PRIMARY KEY, + username TEXT, + last_name TEXT NULL DEFAULT NULL + )')! + db.begin()! + db.exec("insert into users (username) values ('jackson')")! + db.savepoint('savepoint1')! + db.exec("insert into users (username) values ('kitty')")! + db.rollback_to('savepoint1')! + db.exec("insert into users (username) values ('mars')")! + db.commit()! + rows := db.exec('select * from users')! + for row in rows { + // We just need to access the memory to ensure it's properly allocated + dump(row.str()) + } +} diff --git a/vlib/db/sqlite/sqlite.c.v b/vlib/db/sqlite/sqlite.c.v index a1f832691e..ebf9556187 100644 --- a/vlib/db/sqlite/sqlite.c.v +++ b/vlib/db/sqlite/sqlite.c.v @@ -42,6 +42,12 @@ pub enum SyncMode { full } +pub enum Sqlite3TransactionLevel { + deferred + immediate + exclusive +} + pub enum JournalMode { off delete @@ -402,7 +408,7 @@ pub fn (db &DB) exec_param(query string, param string) ![]Row { // create_table issues a "create table if not exists" command to the db. // It creates the table named 'table_name', with columns generated from 'columns' array. // The default columns type will be TEXT. -pub fn (db &DB) create_table(table_name string, columns []string) ! { +pub fn (mut db DB) create_table(table_name string, columns []string) ! { db.exec('create table if not exists ${table_name} (' + columns.join(',\n') + ')')! } @@ -452,3 +458,45 @@ pub fn (db &DB) journal_mode(journal_mode JournalMode) ! { db.exec('pragma journal_mode = MEMORY;')! } } + +@[params] +pub struct Sqlite3TransactionParam { + transaction_level Sqlite3TransactionLevel = .deferred +} + +// begin begins a new transaction. +pub fn (mut db DB) begin(param Sqlite3TransactionParam) ! { + mut sql_stmt := 'BEGIN ' + match param.transaction_level { + .deferred { sql_stmt += 'DEFERRED;' } + .immediate { sql_stmt += 'IMMEDIATE;' } + .exclusive { sql_stmt += 'EXCLUSIVE;' } + } + db.exec(sql_stmt)! +} + +// savepoint create a new savepoint. +pub fn (mut db DB) savepoint(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + db.exec('SAVEPOINT ${savepoint};')! +} + +// commit commits the current transaction. +pub fn (mut db DB) commit() ! { + db.exec('COMMIT;')! +} + +// rollback rollbacks the current transaction. +pub fn (mut db DB) rollback() ! { + db.exec('ROLLBACK;')! +} + +// rollback_to rollbacks to a specified savepoint. +pub fn (mut db DB) rollback_to(savepoint string) ! { + if !savepoint.is_identifier() { + return error('savepoint should be a identifier string') + } + db.exec('ROLLBACK TO ${savepoint};')! +} diff --git a/vlib/db/sqlite/sqlite_test.v b/vlib/db/sqlite/sqlite_test.v index 827e3daa1d..1b8019db35 100644 --- a/vlib/db/sqlite/sqlite_test.v +++ b/vlib/db/sqlite/sqlite_test.v @@ -5,8 +5,9 @@ type Connection = sqlite.DB struct User { pub: - id int @[primary; sql: serial] - name string + id int @[primary; sql: serial] + name string + last_name ?string } type Content = []u8 | string @@ -37,7 +38,7 @@ fn test_sqlite() { mut db := sqlite.connect(':memory:') or { panic(err) } assert db.is_open db.exec('drop table if exists users')! - db.exec("create table users (id integer primary key, name text default '');")! + db.exec("create table users (id integer primary key, name text default '', last_name text null default null);")! db.exec("insert into users (name) values ('Sam')")! assert db.last_insert_rowid() == 1 assert db.get_affected_rows_count() == 1 @@ -55,16 +56,17 @@ fn test_sqlite() { assert username[0].vals[0] == 'Sam' // this insert will be rejected due to duplicated id - db.exec("insert into users (id,name) values (1,'Sam')")! + db.exec("insert into users (id,name) values (1,'Silly')")! assert db.get_affected_rows_count() == 0 - users := db.exec('select * from users')! + mut users := db.exec('select * from users')! + dump(users) assert users.len == 4 code := db.exec_none('vacuum') assert code == 101 user := db.exec_one('select * from users where id = 3') or { panic(err) } - println(user) - assert user.vals.len == 2 + dump(user) + assert user.vals.len == 3 db.exec("update users set name='zzzz' where name='qqqq'")! assert db.get_affected_rows_count() == 0 @@ -80,6 +82,21 @@ fn test_sqlite() { db.exec("delete from users where name='Sam'")! assert db.get_affected_rows_count() == 1 + // transaction test + db.begin()! + db.exec("insert into users (name) values ('John')")! + assert db.last_insert_rowid() == 5 + db.savepoint('new_savepoint')! + db.exec("insert into users (name) values ('Kitty')")! + assert db.last_insert_rowid() == 6 + db.rollback_to('new_savepoint')! + db.exec("insert into users (name) values ('Mars')")! + assert db.last_insert_rowid() == 6 + db.commit()! + users = db.exec('select * from users')! + dump(users) + assert users.len == 5 + db.close() or { panic(err) } assert !db.is_open }