diff --git a/examples/vweb/vweb_example.v b/examples/vweb/vweb_example.v index fb220d14a3..dc42da315c 100644 --- a/examples/vweb/vweb_example.v +++ b/examples/vweb/vweb_example.v @@ -3,9 +3,12 @@ module main import vweb import rand -const ( - port = 8082 -) +const port = 8082 + +struct State { +mut: + cnt int +} struct App { vweb.Context @@ -13,18 +16,10 @@ mut: state shared State } -struct State { -mut: - cnt int -} - -pub fn (app App) before_request() { - println('[vweb] before_request: ${app.req.method} ${app.req.url}') -} - -fn main() { - println('vweb example') - vweb.run(&App{}, port) +pub fn (app &App) before_request() { + $if trace_before_request ? { + eprintln('[vweb] before_request: ${app.req.method} ${app.req.url}') + } } ['/users/:user'] @@ -36,11 +31,17 @@ pub fn (mut app App) user_endpoint(user string) vweb.Result { } pub fn (mut app App) index() vweb.Result { + mut c := 0 lock app.state { app.state.cnt++ + c = app.state.cnt + // + $if trace_address_of_app_state_cnt ? { + dump(ptr_str(app.state.cnt)) + } } show := true - hello := 'Hello world from vweb' + hello := 'Hello world from vweb, request number: ${c}' numbers := [1, 2, 3] return $vweb.html() } @@ -58,3 +59,8 @@ pub fn (mut app App) cookie() vweb.Result { pub fn (mut app App) post() vweb.Result { return app.text('Post body: ${app.req.data}') } + +fn main() { + println('vweb example') + vweb.run(&App{}, port) +} diff --git a/vlib/net/tcp.v b/vlib/net/tcp.v index 92404877ef..cb20037a64 100644 --- a/vlib/net/tcp.v +++ b/vlib/net/tcp.v @@ -14,6 +14,7 @@ pub struct TcpConn { pub mut: sock TcpSocket mut: + handle int write_deadline time.Time read_deadline time.Time read_timeout time.Duration @@ -242,6 +243,15 @@ pub fn (mut c TcpConn) wait_for_write() ! { return wait_for_write(c.sock.handle, c.write_deadline, c.write_timeout) } +// set_sock initialises the c.sock field. It should be called after `.accept_only()!`. +// Note: just use `.accept()!`. In most cases it is simpler, and calls `.set_sock()!` for you. +pub fn (mut c TcpConn) set_sock() ! { + c.sock = tcp_socket_from_handle(c.handle)! + $if trace_tcp ? { + eprintln(' TcpListener.accept | << new_sock.handle: ${c.handle:6}') + } +} + pub fn (c &TcpConn) peer_addr() !Addr { mut addr := Addr{ addr: AddrData{ @@ -295,10 +305,32 @@ pub fn listen_tcp(family AddrFamily, saddr string) !&TcpListener { } } +// accept a tcp connection from an external source to the listener `l`. pub fn (mut l TcpListener) accept() !&TcpConn { + mut res := l.accept_only()! + res.set_sock()! + return res +} + +// accept_only accepts a tcp connection from an external source to the listener `l`. +// Unlike `accept`, `accept_only` *will not call* `.set_sock()!` on the result, +// and is thus faster. +// +// Note: you *need* to call `.set_sock()!` manually, before using the +// connection after calling `.accept_only()!`, but that does not have to happen +// in the same thread that called `.accept_only()!`. +// The intention of this API, is to have a more efficient way to accept +// connections, that are later processed by a thread pool, while the main +// thread remains active, so that it can accept other connections. +// See also vlib/vweb/vweb.v . +// +// If you do not need that, just call `.accept()!` instead, which will call +// `.set_sock()!` for you. +pub fn (mut l TcpListener) accept_only() !&TcpConn { $if trace_tcp ? { eprintln(' TcpListener.accept | l.sock.handle: ${l.sock.handle:6}') } + mut new_handle := C.accept(l.sock.handle, 0, 0) if new_handle <= 0 { l.wait_for_accept()! @@ -307,12 +339,9 @@ pub fn (mut l TcpListener) accept() !&TcpConn { return error('accept failed') } } - new_sock := tcp_socket_from_handle(new_handle)! - $if trace_tcp ? { - eprintln(' TcpListener.accept | << new_sock.handle: ${new_sock.handle:6}') - } + return &TcpConn{ - sock: new_sock + handle: new_handle read_timeout: net.tcp_default_read_timeout write_timeout: net.tcp_default_write_timeout } diff --git a/vlib/vweb/vweb.v b/vlib/vweb/vweb.v index 101fcc988d..0176a1553f 100644 --- a/vlib/vweb/vweb.v +++ b/vlib/vweb/vweb.v @@ -5,6 +5,7 @@ module vweb import os import io +import runtime import net import net.http import net.urllib @@ -263,7 +264,7 @@ pub fn (mut ctx Context) file(f_path string) Result { } content_type := vweb.mime_types[ext] if content_type.len == 0 { - eprintln('no MIME type found for extension ${ext}') + eprintln('[vweb] no MIME type found for extension ${ext}') ctx.server_error(500) } else { ctx.send_response_to_client(content_type, data) @@ -318,7 +319,7 @@ pub fn (mut ctx Context) not_found() Result { pub fn (mut ctx Context) set_cookie(cookie http.Cookie) { cookie_raw := cookie.str() if cookie_raw == '' { - eprintln('error setting cookie: name of cookie is invalid') + eprintln('[vweb] error setting cookie: name of cookie is invalid') return } ctx.add_header('Set-Cookie', cookie_raw) @@ -387,9 +388,11 @@ pub fn run[T](global_app &T, port int) { [params] pub struct RunParams { - host string - port int = 8080 family net.AddrFamily = .ip6 // use `family: .ip, host: 'localhost'` when you want it to bind only to 127.0.0.1 + host string + port int = 8080 + nr_workers int = runtime.nr_jobs() + pool_channel_slots int = 1000 show_startup_message bool = true } @@ -400,59 +403,96 @@ pub fn run_at[T](global_app &T, params RunParams) ! { if params.port <= 0 || params.port > 65535 { return error('invalid port number `${params.port}`, it should be between 1 and 65535') } + if params.pool_channel_slots < 1 { + return error('invalid pool_channel_slots `${params.pool_channel_slots}`, it should be above 0, preferably higher than 10 x nr_workers') + } + if params.nr_workers < 1 { + return error('invalid nr_workers `${params.nr_workers}`, it should be above 0') + } + mut l := net.listen_tcp(params.family, '${params.host}:${params.port}') or { ecode := err.code() return error('failed to listen ${ecode} ${err}') } - // Parsing methods attributes + host := if params.host == '' { 'localhost' } else { params.host } + if params.show_startup_message { + println('[Vweb] Running app on http://${host}:${params.port}/') + } + + ch := chan &RequestParams{cap: params.pool_channel_slots} + mut ws := []thread{cap: params.nr_workers} + for worker_number in 0 .. params.nr_workers { + ws << new_worker[T](ch, worker_number) + } + if params.show_startup_message { + println('[Vweb] We have ${ws.len} workers') + } + flush_stdout() + + // Parse the attributes of vweb app methods: mut routes := map[string]Route{} $for method in T.methods { http_methods, route_path, middleware := parse_attrs(method.name, method.attrs) or { return error('error parsing method attributes: ${err}') } - routes[method.name] = Route{ methods: http_methods path: route_path middleware: middleware } } - host := if params.host == '' { 'localhost' } else { params.host } - if params.show_startup_message { - println('[Vweb] Running app on http://${host}:${params.port}/') - } - flush_stdout() + // Forever accept every connection that comes, and + // pass it through the channel, to the thread pool: for { - // Create a new app object for each connection, copy global data like db connections - mut request_app := &T{} - $if T is MiddlewareInterface { - request_app = &T{ - middlewares: global_app.middlewares.clone() - } - } - $if T is DbInterface { - request_app.db = global_app.db - } $else { - // println('vweb no db') - } - $for field in T.fields { - if 'vweb_global' in field.attrs || field.is_shared { - request_app.$(field.name) = global_app.$(field.name) - } - } - request_app.Context = global_app.Context // copy the context ref that contains static files map etc - mut conn := l.accept() or { + mut connection := l.accept_only() or { // failures should not panic - eprintln('accept() failed with error: ${err.msg()}') + eprintln('[vweb] accept() failed with error: ${err.msg()}') continue } - spawn handle_conn[T](mut conn, mut request_app, routes) + ch <- &RequestParams{ + connection: connection + global_app: unsafe { global_app } + routes: &routes + } } } +fn new_request_app[T](global_app &T) &T { + // Create a new app object for each connection, copy global data like db connections + mut request_app := &T{} + $if T is MiddlewareInterface { + request_app = &T{ + middlewares: global_app.middlewares.clone() + } + } + $if T is DbInterface { + request_app.db = global_app.db + } + $for field in T.fields { + if field.is_shared { + unsafe { + // TODO: remove this horrible hack, when copying a shared field at comptime works properly!!! + raptr := &voidptr(&request_app.$(field.name)) + gaptr := &voidptr(&global_app.$(field.name)) + *raptr = *gaptr + _ = raptr // TODO: v produces a warning that `raptr` is unused otherwise, even though it was on the previous line + } + } else { + if 'vweb_global' in field.attrs { + request_app.$(field.name) = global_app.$(field.name) + } + } + } + request_app.Context = global_app.Context // copy the context ref that contains static files map etc + return request_app +} + [manualfree] -fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { +fn handle_conn[T](mut conn net.TcpConn, global_app &T, routes &map[string]Route, tid int) { + // Create a new app object for each connection, copy global data like db connections + mut app := new_request_app[T](global_app) + conn.set_read_timeout(30 * time.second) conn.set_write_timeout(30 * time.second) defer { @@ -462,6 +502,11 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { } } + conn.set_sock() or { + eprintln('[vweb] tid: ${tid:03d}, error setting socket') + return + } + mut reader := io.new_buffered_reader(reader: conn) defer { unsafe { @@ -475,7 +520,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { req := http.parse_request(mut reader) or { // Prevents errors from being thrown when BufferedReader is empty if '${err}' != 'none' { - eprintln('error parsing request: ${err}') + eprintln('[vweb] tid: ${tid:03d}, error parsing request: ${err}') } return } @@ -487,7 +532,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { } // URL Parse url := urllib.parse(req.url) or { - eprintln('error parsing path: ${err}') + eprintln('[vweb] tid: ${tid:03d}, error parsing path: ${err}') return } @@ -538,8 +583,8 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { // Route matching $for method in T.methods { $if method.return_type is Result { - route := routes[method.name] or { - eprintln('parsed attributes for the `${method.name}` are not found, skipping...') + route := (*routes)[method.name] or { + eprintln('[vweb] tid: ${tid:03d}, parsed attributes for the `${method.name}` are not found, skipping...') Route{} } @@ -598,7 +643,7 @@ fn handle_conn[T](mut conn net.TcpConn, mut app T, routes map[string]Route) { if params := route_matches(url_words, route_words) { method_args := params.clone() if method_args.len != method.args.len { - eprintln('warning: uneven parameters count (${method.args.len}) in `${method.name}`, compared to the vweb route `${method.attrs}` (${method_args.len})') + eprintln('[vweb] tid: ${tid:03d}, warning: uneven parameters count (${method.args.len}) in `${method.name}`, compared to the vweb route `${method.attrs}` (${method_args.len})') } $if T is MiddlewareInterface { @@ -641,7 +686,7 @@ fn validate_middleware[T](mut app T, full_path string) bool { fn validate_app_middleware[T](mut app T, middleware string, method_name string) bool { // then the middleware that is defined for this route specifically valid := fire_app_middleware(mut app, middleware) or { - eprintln('warning: middleware `${middleware}` for the `${method_name}` are not found') + eprintln('[vweb] warning: middleware `${middleware}` for the `${method_name}` are not found') true } return valid @@ -654,7 +699,7 @@ fn fire_app_middleware[T](mut app T, method_name string) ?bool { $if method.return_type is bool { return app.$method() } $else { - eprintln('error in `${method.name}, middleware functions must return bool') + eprintln('[vweb] error in `${method.name}, middleware functions must return bool') return none } } @@ -809,7 +854,7 @@ pub fn (ctx &Context) ip() string { // Set s to the form error pub fn (mut ctx Context) error(s string) { - eprintln('vweb error: ${s}') + eprintln('[vweb] Context.error: ${s}') ctx.form_error = s } @@ -837,3 +882,38 @@ fn send_string(mut conn net.TcpConn, s string) ! { fn filter(s string) string { return html.escape(s) } + +// Worker functions for the thread pool: +struct RequestParams { + global_app voidptr + routes &map[string]Route +mut: + connection &net.TcpConn +} + +struct Worker[T] { + id int + ch chan &RequestParams +} + +fn new_worker[T](ch chan &RequestParams, id int) thread { + mut w := &Worker[T]{ + id: id + ch: ch + } + return spawn w.process_incomming_requests[T]() +} + +fn (mut w Worker[T]) process_incomming_requests() { + sid := '[vweb] tid: ${w.id:03d} received request' + for { + mut params := <-w.ch or { break } + $if vweb_trace_worker_scan ? { + eprintln(sid) + } + handle_conn[T](mut params.connection, params.global_app, params.routes, w.id) + } + $if vweb_trace_worker_scan ? { + eprintln('[vweb] closing worker ${w.id}.') + } +}