db: mysql,pg,sqlite add transaction support (fix #24290) (#24352)

This commit is contained in:
kbkpbot 2025-04-29 14:10:13 +08:00 committed by GitHub
parent 8fc7aeca38
commit af947f1af5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 358 additions and 28 deletions

View file

@ -41,23 +41,59 @@ more user friendly errors for that situation.
import db.mysql import db.mysql
// Create connection // Create connection
mut connection := mysql.Connection{ config := mysql.Config{
host: '127.0.0.1'
port: 3306
username: 'root' username: 'root'
password: '12345678'
dbname: 'mysql' dbname: 'mysql'
} }
// Connect to server // Connect to server
connection.connect()? mut db := mysql.connect(config)!
// Change the default database
connection.select_db('db_users')?
// Do a query // Do a query
get_users_query_result := connection.query('SELECT * FROM users')? res := db.query('select * from users')!
// Get the result as maps rows := res.rows()
for user in get_users_query_result.maps() { for row in rows {
// Access the name of user println(row.vals)
println(user['name'])
} }
// Free the query result
get_users_query_result.free()
// Close the connection if needed // Close the connection if needed
connection.close() db.close()
```
## Transaction
```v oksyntax
import db.mysql
// Create connection
config := mysql.Config{
host: '127.0.0.1'
port: 3306
username: 'root'
password: '12345678'
dbname: 'mysql'
}
mut db := mysql.connect(config)!
// turn off `autocommit` first
db.autocommit(false)!
// begin a new transaction
db.begin()!
mut result_code := db.exec_none('insert into users (username) values ("tom")')
assert result_code == 0
// make a savepoint
db.savepoint('savepoint1')!
result_code = db.exec_none('insert into users (username) values ("kitty")')
assert result_code == 0
// rollback to `savepoint1`
db.rollback_to('savepoint1')!
result_code = db.exec_none('insert into users (username) values ("mars")')
assert result_code == 0
db.commit()!
res := db.query('select * from users')!
rows := res.rows()
dump(rows)
// Close the connection if needed
db.close()
``` ```

View file

@ -86,6 +86,9 @@ fn C.mysql_autocommit(mysql &C.MYSQL, mode bool) int
// C.mysql_commit commits the current transaction. // C.mysql_commit commits the current transaction.
fn C.mysql_commit(mysql &C.MYSQL) int fn C.mysql_commit(mysql &C.MYSQL) int
// C.mysql_rollback rollback the current transaction.
fn C.mysql_rollback(mysql &C.MYSQL) int
// C.mysql_refresh flush tables or caches, or resets replication server information. // C.mysql_refresh flush tables or caches, or resets replication server information.
fn C.mysql_refresh(mysql &C.MYSQL, options u32) int fn C.mysql_refresh(mysql &C.MYSQL, options u32) int

View file

@ -38,6 +38,13 @@ pub enum ConnectionFlag {
client_remember_options // (1 << 31) Don't reset the options after an unsuccessful connect client_remember_options // (1 << 31) Don't reset the options after an unsuccessful connect
} }
pub enum MySQLTransactionLevel {
read_uncommitted
read_committed
repeatable_read
serializable
}
struct SQLError { struct SQLError {
MessageError MessageError
} }
@ -188,7 +195,7 @@ pub fn (mut db DB) autocommit(mode bool) ! {
} }
// commit commits the current transaction. // commit commits the current transaction.
pub fn (db &DB) commit() ! { pub fn (mut db DB) commit() ! {
db.check_connection_is_established()! db.check_connection_is_established()!
result := C.mysql_commit(db.conn) result := C.mysql_commit(db.conn)
@ -197,6 +204,71 @@ pub fn (db &DB) commit() ! {
} }
} }
@[params]
pub struct MySQLTransactionParam {
transaction_level MySQLTransactionLevel = .repeatable_read
}
// begin begins a new transaction.
pub fn (mut db DB) begin(param MySQLTransactionParam) ! {
db.check_connection_is_established()!
db.set_transaction_level(param.transaction_level)!
result := db.exec_none('START TRANSACTION')
if result != 0 {
db.throw_mysql_error()!
}
}
// set_transaction_level set level for the transaction
pub fn (mut db DB) set_transaction_level(level MySQLTransactionLevel) ! {
db.check_connection_is_established()!
mut sql_stmt := 'SET TRANSACTION ISOLATION LEVEL '
match level {
.read_uncommitted { sql_stmt += 'READ UNCOMMITTED' }
.read_committed { sql_stmt += 'READ COMMITTED' }
.repeatable_read { sql_stmt += 'REPEATABLE READ' }
.serializable { sql_stmt += 'SERIALIZABLE' }
}
result := db.exec_none(sql_stmt)
if result != 0 {
db.throw_mysql_error()!
}
}
// rollback rollbacks the current transaction.
pub fn (mut db DB) rollback() ! {
db.check_connection_is_established()!
result := C.mysql_rollback(db.conn)
if result != 0 {
db.throw_mysql_error()!
}
}
// rollback_to rollbacks to a specified savepoint.
pub fn (mut db DB) rollback_to(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
db.check_connection_is_established()!
result := db.exec_none('ROLLBACK TO SAVEPOINT ${savepoint}')
if result != 0 {
db.throw_mysql_error()!
}
}
// savepoint create a new savepoint.
pub fn (mut db DB) savepoint(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
db.check_connection_is_established()!
result := db.exec_none('SAVEPOINT ${savepoint}')
if result != 0 {
db.throw_mysql_error()!
}
}
// tables returns a list of the names of the tables in the current database, // tables returns a list of the names of the tables in the current database,
// that match the simple regular expression specified by the `wildcard` parameter. // that match the simple regular expression specified by the `wildcard` parameter.
// The `wildcard` parameter may contain the wildcard characters `%` or `_`. // The `wildcard` parameter may contain the wildcard characters `%` or `_`.
@ -500,9 +572,12 @@ pub fn (stmt &StmtHandle) execute(params []string) ![]Row {
binds[i].buffer = data binds[i].buffer = data
binds[i].buffer_length = l binds[i].buffer_length = l
code = C.mysql_stmt_fetch_column(stmt.stmt, unsafe { &binds[i] }, i, 0) code = C.mysql_stmt_fetch_column(stmt.stmt, unsafe { &binds[i] }, i, 0)
if *(binds[i].is_null) {
row.vals << ''
} else {
row.vals << unsafe { data.vstring() } row.vals << unsafe { data.vstring() }
} }
}
rows << row rows << row
} }
return rows return rows

View file

@ -11,11 +11,11 @@ fn test_mysql() {
host: '127.0.0.1' host: '127.0.0.1'
port: 3306 port: 3306
username: 'root' username: 'root'
password: '' password: '12345678'
dbname: 'mysql' dbname: 'mysql'
} }
db := mysql.connect(config)! mut db := mysql.connect(config)!
mut response := db.exec('drop table if exists users')! mut response := db.exec('drop table if exists users')!
assert response == []mysql.Row{} assert response == []mysql.Row{}
@ -87,4 +87,45 @@ fn test_mysql() {
assert response[0] == mysql.Row{ assert response[0] == mysql.Row{
vals: ['4', 'blaze', ''] vals: ['4', 'blaze', '']
} }
// transaction test
// turn off `autocommit` first
db.autocommit(false)!
// begin a new transaction
db.begin()!
result_code = db.exec_none('insert into users (username) values ("tom")')
assert result_code == 0
// make a savepoint
db.savepoint('savepoint1')!
result_code = db.exec_none('insert into users (username) values ("kitty")')
assert result_code == 0
// rollback to `savepoint1`
db.rollback_to('savepoint1')!
result_code = db.exec_none('insert into users (username) values ("mars")')
assert result_code == 0
db.commit()!
response = db.exec_param_many('select * from users', [''])!
assert response == [
mysql.Row{
vals: ['1', 'jackson', '']
},
mysql.Row{
vals: ['2', 'shannon', '']
},
mysql.Row{
vals: ['3', 'bailey', '']
},
mysql.Row{
vals: ['4', 'blaze', '']
},
mysql.Row{
vals: ['5', 'Hi', '']
},
mysql.Row{
vals: ['6', 'tom', '']
},
mysql.Row{
vals: ['8', 'mars', '']
},
]
} }

View file

@ -124,6 +124,8 @@ fn C.PQconnectdb(const_conninfo &char) &C.PGconn
fn C.PQstatus(const_conn &C.PGconn) int fn C.PQstatus(const_conn &C.PGconn) int
fn C.PQtransactionStatus(const_conn &C.PGconn) int
fn C.PQerrorMessage(const_conn &C.PGconn) &char fn C.PQerrorMessage(const_conn &C.PGconn) &char
fn C.PQexec(res &C.PGconn, const_query &char) &C.PGresult fn C.PQexec(res &C.PGconn, const_query &char) &C.PGresult
@ -436,3 +438,75 @@ fn pg_stmt_worker(db DB, query string, data orm.QueryData, where orm.QueryData)
param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results param_vals.data, param_lens.data, param_formats.data, 0) // here, the last 0 means require text results, 1 - binary results
return db.handle_error_or_result(res, 'orm_stmt_worker') return db.handle_error_or_result(res, 'orm_stmt_worker')
} }
pub enum PQTransactionLevel {
read_uncommitted
read_committed
repeatable_read
serializable
}
@[params]
pub struct PQTransactionParam {
transaction_level PQTransactionLevel = .repeatable_read
}
// begin begins a new transaction.
pub fn (db DB) begin(param PQTransactionParam) ! {
mut sql_stmt := 'BEGIN TRANSACTION ISOLATION LEVEL '
match param.transaction_level {
.read_uncommitted { sql_stmt += 'READ UNCOMMITTED' }
.read_committed { sql_stmt += 'READ COMMITTED' }
.repeatable_read { sql_stmt += 'REPEATABLE READ' }
.serializable { sql_stmt += 'SERIALIZABLE' }
}
_ := C.PQexec(db.conn, &char(sql_stmt.str))
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
if e != '' {
return error('pg exec error: "${e}"')
}
}
// commit commits the current transaction.
pub fn (db DB) commit() ! {
_ := C.PQexec(db.conn, c'COMMIT;')
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
if e != '' {
return error('pg exec error: "${e}"')
}
}
// rollback rollbacks the current transaction.
pub fn (db DB) rollback() ! {
_ := C.PQexec(db.conn, c'ROLLBACK;')
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
if e != '' {
return error('pg exec error: "${e}"')
}
}
// rollback_to rollbacks to a specified savepoint.
pub fn (db DB) rollback_to(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
sql_stmt := 'ROLLBACK TO SAVEPOINT ${savepoint};'
_ := C.PQexec(db.conn, &char(sql_stmt.str))
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
if e != '' {
return error('pg exec error: "${e}"')
}
}
// savepoint create a new savepoint.
pub fn (db DB) savepoint(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
sql_stmt := 'SAVEPOINT ${savepoint};'
_ := C.PQexec(db.conn, &char(sql_stmt.str))
e := unsafe { C.PQerrorMessage(db.conn).vstring() }
if e != '' {
return error('pg exec error: "${e}"')
}
}

View file

@ -10,7 +10,7 @@ fn test_large_exec() {
return return
} }
db := pg.connect(pg.Config{ user: 'postgres', password: 'secret', dbname: 'postgres' })! db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })!
defer { defer {
db.close() db.close()
} }
@ -31,7 +31,12 @@ WHERE
} }
fn test_prepared() { fn test_prepared() {
db := pg.connect(pg.Config{ user: 'postgres', password: 'secret', dbname: 'postgres' })! $if !network ? {
eprintln('> Skipping test ${@FN}, since `-d network` is not passed.')
eprintln('> This test requires a working postgres server running on localhost.')
return
}
db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })!
defer { defer {
db.close() db.close()
} }
@ -42,3 +47,34 @@ fn test_prepared() {
assert result.len == 1 assert result.len == 1
} }
fn test_transaction() {
$if !network ? {
eprintln('> Skipping test ${@FN}, since `-d network` is not passed.')
eprintln('> This test requires a working postgres server running on localhost.')
return
}
db := pg.connect(pg.Config{ user: 'postgres', password: '12345678', dbname: 'postgres' })!
defer {
db.close()
}
db.exec('drop table if exists users')!
db.exec('create table if not exists users (
id SERIAL PRIMARY KEY,
username TEXT,
last_name TEXT NULL DEFAULT NULL
)')!
db.begin()!
db.exec("insert into users (username) values ('jackson')")!
db.savepoint('savepoint1')!
db.exec("insert into users (username) values ('kitty')")!
db.rollback_to('savepoint1')!
db.exec("insert into users (username) values ('mars')")!
db.commit()!
rows := db.exec('select * from users')!
for row in rows {
// We just need to access the memory to ensure it's properly allocated
dump(row.str())
}
}

View file

@ -42,6 +42,12 @@ pub enum SyncMode {
full full
} }
pub enum Sqlite3TransactionLevel {
deferred
immediate
exclusive
}
pub enum JournalMode { pub enum JournalMode {
off off
delete delete
@ -402,7 +408,7 @@ pub fn (db &DB) exec_param(query string, param string) ![]Row {
// create_table issues a "create table if not exists" command to the db. // create_table issues a "create table if not exists" command to the db.
// It creates the table named 'table_name', with columns generated from 'columns' array. // It creates the table named 'table_name', with columns generated from 'columns' array.
// The default columns type will be TEXT. // The default columns type will be TEXT.
pub fn (db &DB) create_table(table_name string, columns []string) ! { pub fn (mut db DB) create_table(table_name string, columns []string) ! {
db.exec('create table if not exists ${table_name} (' + columns.join(',\n') + ')')! db.exec('create table if not exists ${table_name} (' + columns.join(',\n') + ')')!
} }
@ -452,3 +458,45 @@ pub fn (db &DB) journal_mode(journal_mode JournalMode) ! {
db.exec('pragma journal_mode = MEMORY;')! db.exec('pragma journal_mode = MEMORY;')!
} }
} }
@[params]
pub struct Sqlite3TransactionParam {
transaction_level Sqlite3TransactionLevel = .deferred
}
// begin begins a new transaction.
pub fn (mut db DB) begin(param Sqlite3TransactionParam) ! {
mut sql_stmt := 'BEGIN '
match param.transaction_level {
.deferred { sql_stmt += 'DEFERRED;' }
.immediate { sql_stmt += 'IMMEDIATE;' }
.exclusive { sql_stmt += 'EXCLUSIVE;' }
}
db.exec(sql_stmt)!
}
// savepoint create a new savepoint.
pub fn (mut db DB) savepoint(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
db.exec('SAVEPOINT ${savepoint};')!
}
// commit commits the current transaction.
pub fn (mut db DB) commit() ! {
db.exec('COMMIT;')!
}
// rollback rollbacks the current transaction.
pub fn (mut db DB) rollback() ! {
db.exec('ROLLBACK;')!
}
// rollback_to rollbacks to a specified savepoint.
pub fn (mut db DB) rollback_to(savepoint string) ! {
if !savepoint.is_identifier() {
return error('savepoint should be a identifier string')
}
db.exec('ROLLBACK TO ${savepoint};')!
}

View file

@ -7,6 +7,7 @@ struct User {
pub: pub:
id int @[primary; sql: serial] id int @[primary; sql: serial]
name string name string
last_name ?string
} }
type Content = []u8 | string type Content = []u8 | string
@ -37,7 +38,7 @@ fn test_sqlite() {
mut db := sqlite.connect(':memory:') or { panic(err) } mut db := sqlite.connect(':memory:') or { panic(err) }
assert db.is_open assert db.is_open
db.exec('drop table if exists users')! db.exec('drop table if exists users')!
db.exec("create table users (id integer primary key, name text default '');")! db.exec("create table users (id integer primary key, name text default '', last_name text null default null);")!
db.exec("insert into users (name) values ('Sam')")! db.exec("insert into users (name) values ('Sam')")!
assert db.last_insert_rowid() == 1 assert db.last_insert_rowid() == 1
assert db.get_affected_rows_count() == 1 assert db.get_affected_rows_count() == 1
@ -55,16 +56,17 @@ fn test_sqlite() {
assert username[0].vals[0] == 'Sam' assert username[0].vals[0] == 'Sam'
// this insert will be rejected due to duplicated id // this insert will be rejected due to duplicated id
db.exec("insert into users (id,name) values (1,'Sam')")! db.exec("insert into users (id,name) values (1,'Silly')")!
assert db.get_affected_rows_count() == 0 assert db.get_affected_rows_count() == 0
users := db.exec('select * from users')! mut users := db.exec('select * from users')!
dump(users)
assert users.len == 4 assert users.len == 4
code := db.exec_none('vacuum') code := db.exec_none('vacuum')
assert code == 101 assert code == 101
user := db.exec_one('select * from users where id = 3') or { panic(err) } user := db.exec_one('select * from users where id = 3') or { panic(err) }
println(user) dump(user)
assert user.vals.len == 2 assert user.vals.len == 3
db.exec("update users set name='zzzz' where name='qqqq'")! db.exec("update users set name='zzzz' where name='qqqq'")!
assert db.get_affected_rows_count() == 0 assert db.get_affected_rows_count() == 0
@ -80,6 +82,21 @@ fn test_sqlite() {
db.exec("delete from users where name='Sam'")! db.exec("delete from users where name='Sam'")!
assert db.get_affected_rows_count() == 1 assert db.get_affected_rows_count() == 1
// transaction test
db.begin()!
db.exec("insert into users (name) values ('John')")!
assert db.last_insert_rowid() == 5
db.savepoint('new_savepoint')!
db.exec("insert into users (name) values ('Kitty')")!
assert db.last_insert_rowid() == 6
db.rollback_to('new_savepoint')!
db.exec("insert into users (name) values ('Mars')")!
assert db.last_insert_rowid() == 6
db.commit()!
users = db.exec('select * from users')!
dump(users)
assert users.len == 5
db.close() or { panic(err) } db.close() or { panic(err) }
assert !db.is_open assert !db.is_open
} }