mirror of
https://github.com/vlang/v.git
synced 2025-09-13 22:42:26 +03:00
net.http: support passing on_running, on_stopped, on_closed callback functions to http.Server{}, as well as show_startup_message: false. (#19591)
This commit is contained in:
parent
69d62e458b
commit
0a89f3082d
3 changed files with 172 additions and 27 deletions
|
@ -8,14 +8,14 @@ import net
|
||||||
import time
|
import time
|
||||||
import runtime
|
import runtime
|
||||||
// ServerStatus is the current status of the server.
|
// ServerStatus is the current status of the server.
|
||||||
// .running means that the server is active and serving.
|
// .closed means that the server is completely inactive (the default on creation, and after calling .close()).
|
||||||
// .stopped means that the server is not active but still listening.
|
// .running means that the server is active and serving (after .listen_and_serve()).
|
||||||
// .closed means that the server is completely inactive.
|
// .stopped means that the server is not active but still listening (after .stop() ).
|
||||||
|
|
||||||
pub enum ServerStatus {
|
pub enum ServerStatus {
|
||||||
|
closed
|
||||||
running
|
running
|
||||||
stopped
|
stopped
|
||||||
closed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
interface Handler {
|
interface Handler {
|
||||||
|
@ -36,6 +36,12 @@ pub mut:
|
||||||
pool_channel_slots int = 1024
|
pool_channel_slots int = 1024
|
||||||
worker_num int = runtime.nr_jobs()
|
worker_num int = runtime.nr_jobs()
|
||||||
listener net.TcpListener
|
listener net.TcpListener
|
||||||
|
//
|
||||||
|
on_running fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .running state.
|
||||||
|
on_stopped fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .stopped state.
|
||||||
|
on_closed fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .closed state.
|
||||||
|
//
|
||||||
|
show_startup_message bool = true // set to false, to remove the default `Listening on ...` message.
|
||||||
}
|
}
|
||||||
|
|
||||||
// listen_and_serve listens on the server port `s.port` over TCP network and
|
// listen_and_serve listens on the server port `s.port` over TCP network and
|
||||||
|
@ -55,9 +61,16 @@ pub fn (mut s Server) listen_and_serve() {
|
||||||
eprintln('Failed getting listener address, err: ${err}')
|
eprintln('Failed getting listener address, err: ${err}')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
listening_address := s.addr.clone()
|
mut listening_address := s.addr.clone()
|
||||||
if l.family() == net.AddrFamily.unspec {
|
if l.family() == net.AddrFamily.unspec {
|
||||||
s.listener = net.listen_tcp(.ip6, listening_address) or {
|
if listening_address == ':0' {
|
||||||
|
listening_address = 'localhost:0'
|
||||||
|
}
|
||||||
|
mut listen_family := net.AddrFamily.ip
|
||||||
|
// $if !windows {
|
||||||
|
// listen_family = net.AddrFamily.ip6
|
||||||
|
// }
|
||||||
|
s.listener = net.listen_tcp(listen_family, listening_address) or {
|
||||||
eprintln('Listening on ${s.addr} failed, err: ${err}')
|
eprintln('Listening on ${s.addr} failed, err: ${err}')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -78,9 +91,16 @@ pub fn (mut s Server) listen_and_serve() {
|
||||||
ws << new_handler_worker(wid, ch, s.handler)
|
ws << new_handler_worker(wid, ch, s.handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
println('Listening on http://${listening_address}/')
|
if s.show_startup_message {
|
||||||
flush_stdout()
|
println('Listening on http://${s.addr}/')
|
||||||
|
flush_stdout()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.sleep(20 * time.millisecond)
|
||||||
s.state = .running
|
s.state = .running
|
||||||
|
if s.on_running != unsafe { nil } {
|
||||||
|
s.on_running(mut s)
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
// break if we have a stop signal
|
// break if we have a stop signal
|
||||||
if s.state != .running {
|
if s.state != .running {
|
||||||
|
@ -107,6 +127,9 @@ pub fn (mut s Server) listen_and_serve() {
|
||||||
[inline]
|
[inline]
|
||||||
pub fn (mut s Server) stop() {
|
pub fn (mut s Server) stop() {
|
||||||
s.state = .stopped
|
s.state = .stopped
|
||||||
|
if s.on_stopped != unsafe { nil } {
|
||||||
|
s.on_stopped(mut s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// close immediately closes the port and signals the server that it has been closed.
|
// close immediately closes the port and signals the server that it has been closed.
|
||||||
|
@ -114,6 +137,9 @@ pub fn (mut s Server) stop() {
|
||||||
pub fn (mut s Server) close() {
|
pub fn (mut s Server) close() {
|
||||||
s.state = .closed
|
s.state = .closed
|
||||||
s.listener.close() or { return }
|
s.listener.close() or { return }
|
||||||
|
if s.on_closed != unsafe { nil } {
|
||||||
|
s.on_closed(mut s)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// status indicates whether the server is running, stopped, or closed.
|
// status indicates whether the server is running, stopped, or closed.
|
||||||
|
@ -122,6 +148,31 @@ pub fn (s &Server) status() ServerStatus {
|
||||||
return s.state
|
return s.state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WaitTillRunningParams allows for parametrising the calls to s.wait_till_running()
|
||||||
|
[params]
|
||||||
|
pub struct WaitTillRunningParams {
|
||||||
|
pub:
|
||||||
|
max_retries int = 100 // how many times to check for the status, for each single s.wait_till_running() call
|
||||||
|
retry_period_ms int = 10 // how much time to wait between each check for the status, in milliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait_till_running allows you to synchronise your calling (main) thread, with the state of the server
|
||||||
|
// (when the server is running in another thread).
|
||||||
|
// It returns an error, after params.max_retries * params.retry_period_ms
|
||||||
|
// milliseconds have passed, without that expected server transition.
|
||||||
|
pub fn (mut s Server) wait_till_running(params WaitTillRunningParams) !int {
|
||||||
|
mut i := 0
|
||||||
|
for s.status() != .running && i < params.max_retries {
|
||||||
|
time.sleep(params.retry_period_ms * time.millisecond)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
if i >= params.max_retries {
|
||||||
|
return error('maximum retries reached')
|
||||||
|
}
|
||||||
|
time.sleep(params.retry_period_ms)
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
struct HandlerWorker {
|
struct HandlerWorker {
|
||||||
id int
|
id int
|
||||||
ch chan &net.TcpConn
|
ch chan &net.TcpConn
|
||||||
|
|
|
@ -1,13 +1,28 @@
|
||||||
|
import log
|
||||||
import net
|
import net
|
||||||
import net.http
|
import net.http
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
const atimeout = 500 * time.millisecond
|
||||||
|
|
||||||
|
fn testsuite_begin() {
|
||||||
|
log.info(@FN)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn testsuite_end() {
|
||||||
|
log.info(@FN)
|
||||||
|
}
|
||||||
|
|
||||||
fn test_server_stop() {
|
fn test_server_stop() {
|
||||||
|
log.warn('${@FN} started')
|
||||||
|
defer {
|
||||||
|
log.warn('${@FN} finished')
|
||||||
|
}
|
||||||
mut server := &http.Server{
|
mut server := &http.Server{
|
||||||
accept_timeout: 1 * time.second
|
accept_timeout: atimeout
|
||||||
}
|
}
|
||||||
t := spawn server.listen_and_serve()
|
t := spawn server.listen_and_serve()
|
||||||
time.sleep(250 * time.millisecond)
|
server.wait_till_running()!
|
||||||
mut watch := time.new_stopwatch()
|
mut watch := time.new_stopwatch()
|
||||||
server.stop()
|
server.stop()
|
||||||
assert server.status() == .stopped
|
assert server.status() == .stopped
|
||||||
|
@ -17,12 +32,17 @@ fn test_server_stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_server_close() {
|
fn test_server_close() {
|
||||||
|
log.warn('${@FN} started')
|
||||||
|
defer {
|
||||||
|
log.warn('${@FN} finished')
|
||||||
|
}
|
||||||
mut server := &http.Server{
|
mut server := &http.Server{
|
||||||
accept_timeout: 1 * time.second
|
accept_timeout: atimeout
|
||||||
handler: MyHttpHandler{}
|
handler: MyHttpHandler{}
|
||||||
|
show_startup_message: false
|
||||||
}
|
}
|
||||||
t := spawn server.listen_and_serve()
|
t := spawn server.listen_and_serve()
|
||||||
time.sleep(250 * time.millisecond)
|
server.wait_till_running()!
|
||||||
mut watch := time.new_stopwatch()
|
mut watch := time.new_stopwatch()
|
||||||
server.close()
|
server.close()
|
||||||
assert server.status() == .closed
|
assert server.status() == .closed
|
||||||
|
@ -32,13 +52,18 @@ fn test_server_close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_server_custom_listener() {
|
fn test_server_custom_listener() {
|
||||||
|
log.warn('${@FN} started')
|
||||||
|
defer {
|
||||||
|
log.warn('${@FN} finished')
|
||||||
|
}
|
||||||
listener := net.listen_tcp(.ip6, ':8081')!
|
listener := net.listen_tcp(.ip6, ':8081')!
|
||||||
mut server := &http.Server{
|
mut server := &http.Server{
|
||||||
accept_timeout: 1 * time.second
|
accept_timeout: atimeout
|
||||||
listener: listener
|
listener: listener
|
||||||
|
show_startup_message: false
|
||||||
}
|
}
|
||||||
t := spawn server.listen_and_serve()
|
t := spawn server.listen_and_serve()
|
||||||
time.sleep(250 * time.millisecond)
|
server.wait_till_running()!
|
||||||
mut watch := time.new_stopwatch()
|
mut watch := time.new_stopwatch()
|
||||||
server.close()
|
server.close()
|
||||||
assert server.status() == .closed
|
assert server.status() == .closed
|
||||||
|
@ -74,7 +99,7 @@ fn (mut handler MyHttpHandler) handle(req http.Request) http.Response {
|
||||||
handler.redirects++
|
handler.redirects++
|
||||||
}
|
}
|
||||||
'/big' {
|
'/big' {
|
||||||
r.body = 'xyz def '.repeat(10_000)
|
r.body = 'xyz def '.repeat(5_000)
|
||||||
r.set_status(.ok)
|
r.set_status(.ok)
|
||||||
handler.oks++
|
handler.oks++
|
||||||
}
|
}
|
||||||
|
@ -87,34 +112,36 @@ fn (mut handler MyHttpHandler) handle(req http.Request) http.Response {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
const cport = 8198
|
const cport = 18197
|
||||||
|
|
||||||
fn test_server_custom_handler() {
|
fn test_server_custom_handler() {
|
||||||
|
log.warn('${@FN} started')
|
||||||
|
defer {
|
||||||
|
log.warn('${@FN} finished')
|
||||||
|
}
|
||||||
mut handler := MyHttpHandler{}
|
mut handler := MyHttpHandler{}
|
||||||
mut server := &http.Server{
|
mut server := &http.Server{
|
||||||
accept_timeout: 1 * time.second
|
accept_timeout: atimeout
|
||||||
handler: handler
|
handler: handler
|
||||||
port: cport
|
port: cport
|
||||||
}
|
}
|
||||||
t := spawn server.listen_and_serve()
|
t := spawn server.listen_and_serve()
|
||||||
for server.status() != .running {
|
server.wait_till_running()!
|
||||||
time.sleep(10 * time.millisecond)
|
x := http.fetch(url: 'http://${server.addr}/endpoint?abc=xyz', data: 'my data')!
|
||||||
}
|
|
||||||
x := http.fetch(url: 'http://localhost:${cport}/endpoint?abc=xyz', data: 'my data')!
|
|
||||||
assert x.body == 'my data, /endpoint?abc=xyz'
|
assert x.body == 'my data, /endpoint?abc=xyz'
|
||||||
assert x.status_code == 200
|
assert x.status_code == 200
|
||||||
assert x.status_msg == 'OK'
|
assert x.status_msg == 'OK'
|
||||||
assert x.http_version == '1.1'
|
assert x.http_version == '1.1'
|
||||||
y := http.fetch(url: 'http://localhost:${cport}/another/endpoint', data: 'abcde')!
|
y := http.fetch(url: 'http://${server.addr}/another/endpoint', data: 'abcde')!
|
||||||
assert y.body == 'abcde, /another/endpoint'
|
assert y.body == 'abcde, /another/endpoint'
|
||||||
assert y.status_code == 200
|
assert y.status_code == 200
|
||||||
assert x.status_msg == 'OK'
|
assert x.status_msg == 'OK'
|
||||||
assert y.status() == .ok
|
assert y.status() == .ok
|
||||||
assert y.http_version == '1.1'
|
assert y.http_version == '1.1'
|
||||||
//
|
//
|
||||||
http.fetch(url: 'http://localhost:${cport}/something/else')!
|
http.fetch(url: 'http://${server.addr}/something/else')!
|
||||||
//
|
//
|
||||||
big_url := 'http://localhost:${cport}/redirect_to_big'
|
big_url := 'http://${server.addr}/redirect_to_big'
|
||||||
mut progress_calls := &ProgressCalls{}
|
mut progress_calls := &ProgressCalls{}
|
||||||
z := http.fetch(
|
z := http.fetch(
|
||||||
url: big_url
|
url: big_url
|
||||||
|
@ -140,14 +167,14 @@ fn test_server_custom_handler() {
|
||||||
assert z.status_code == 200
|
assert z.status_code == 200
|
||||||
assert z.body.starts_with('xyz')
|
assert z.body.starts_with('xyz')
|
||||||
assert z.body.len > 10000
|
assert z.body.len > 10000
|
||||||
assert progress_calls.final_size > 80_000
|
assert progress_calls.final_size > 40_000
|
||||||
assert progress_calls.finished_was_called
|
assert progress_calls.finished_was_called
|
||||||
assert progress_calls.chunks.len > 1
|
assert progress_calls.chunks.len > 1
|
||||||
assert progress_calls.reads.len > 1
|
assert progress_calls.reads.len > 1
|
||||||
assert progress_calls.chunks[0].bytestr().starts_with('HTTP/1.1 301 Moved permanently')
|
assert progress_calls.chunks[0].bytestr().starts_with('HTTP/1.1 301 Moved permanently')
|
||||||
assert progress_calls.chunks[1].bytestr().starts_with('HTTP/1.1 200 OK')
|
assert progress_calls.chunks[1].bytestr().starts_with('HTTP/1.1 200 OK')
|
||||||
assert progress_calls.chunks.last().bytestr().contains('xyz def')
|
assert progress_calls.chunks.last().bytestr().contains('xyz def')
|
||||||
assert progress_calls.redirected_to == ['http://localhost:8198/big']
|
assert progress_calls.redirected_to == ['http://${server.addr}/big']
|
||||||
//
|
//
|
||||||
server.stop()
|
server.stop()
|
||||||
t.wait()
|
t.wait()
|
||||||
|
@ -166,3 +193,60 @@ mut:
|
||||||
redirected_to []string
|
redirected_to []string
|
||||||
final_size u64
|
final_size u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
|
||||||
|
struct MyCountingHandler {
|
||||||
|
mut:
|
||||||
|
counter int
|
||||||
|
}
|
||||||
|
|
||||||
|
fn (mut handler MyCountingHandler) handle(req http.Request) http.Response {
|
||||||
|
handler.counter++
|
||||||
|
mut r := http.Response{
|
||||||
|
body: req.data + ', ${req.url}, counter: ${handler.counter}'
|
||||||
|
header: req.header
|
||||||
|
}
|
||||||
|
match req.url.all_before('?') {
|
||||||
|
'/count' {
|
||||||
|
r.set_status(.ok)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
r.set_status(.not_found)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.set_version(req.version)
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_my_counting_handler_on_random_port() {
|
||||||
|
log.warn('${@FN} started')
|
||||||
|
defer {
|
||||||
|
log.warn('${@FN} finished')
|
||||||
|
}
|
||||||
|
mut server := &http.Server{
|
||||||
|
show_startup_message: false
|
||||||
|
port: 0
|
||||||
|
accept_timeout: atimeout
|
||||||
|
handler: MyCountingHandler{}
|
||||||
|
on_running: fn (mut server http.Server) {
|
||||||
|
spawn fn (mut server http.Server) {
|
||||||
|
log.warn('server started')
|
||||||
|
url := 'http://${server.addr}/count'
|
||||||
|
log.info('fetching from url: ${url}')
|
||||||
|
for _ in 0 .. 5 {
|
||||||
|
x := http.fetch(url: url, data: 'my data') or { panic(err) }
|
||||||
|
log.info(x.body)
|
||||||
|
}
|
||||||
|
server.stop()
|
||||||
|
log.warn('server stopped')
|
||||||
|
}(mut server)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server.listen_and_serve()
|
||||||
|
if mut server.handler is MyCountingHandler {
|
||||||
|
dump(server.handler.counter)
|
||||||
|
assert server.handler.counter == 5
|
||||||
|
}
|
||||||
|
assert true
|
||||||
|
}
|
||||||
|
|
|
@ -22,7 +22,17 @@ mut:
|
||||||
is_blocking bool
|
is_blocking bool
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dial_tcp(address string) !&TcpConn {
|
pub fn dial_tcp(oaddress string) !&TcpConn {
|
||||||
|
mut address := oaddress
|
||||||
|
$if windows {
|
||||||
|
// resolving 0.0.0.0 to localhost, works on linux and macos, but not on windows, so try to emulate it:
|
||||||
|
if address.starts_with(':::') {
|
||||||
|
address = address.replace_once(':::', 'localhost:')
|
||||||
|
}
|
||||||
|
if address.starts_with('0.0.0.0:') {
|
||||||
|
address = address.replace_once('0.0.0.0:', 'localhost:')
|
||||||
|
}
|
||||||
|
}
|
||||||
addrs := resolve_addrs_fuzzy(address, .tcp) or {
|
addrs := resolve_addrs_fuzzy(address, .tcp) or {
|
||||||
return error('${err.msg()}; could not resolve address ${address} in dial_tcp')
|
return error('${err.msg()}; could not resolve address ${address} in dial_tcp')
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue