diff --git a/vlib/vanilla_http_server/CONTRIBUTING.md b/vlib/vanilla_http_server/CONTRIBUTING.md new file mode 100644 index 0000000000..7aa0c033b9 --- /dev/null +++ b/vlib/vanilla_http_server/CONTRIBUTING.md @@ -0,0 +1,32 @@ +# Contributing + +## Rules + +- Don't slow down performance +- Always try to keep abstraction to a minimum +- Don't complicate it + +## Benchmarking & Testing + +### CURL + +```sh +curl -X GET --verbose http://localhost:3000/ && +curl -X POST --verbose http://localhost:3000/user && +curl -X GET --verbose http://localhost:3000/user/1 + +``` + +### WRK + +```sh +wrk -H 'Connection: "keep-alive"' --connection 512 --threads 16 --duration 10s http://localhost:3000 +``` + +### Valgrind + +```sh +# Race condition check +v -prod -gc none . +valgrind --tool=helgrind ./vanilla +``` diff --git a/vlib/vanilla_http_server/README.md b/vlib/vanilla_http_server/README.md new file mode 100644 index 0000000000..7f439d892b --- /dev/null +++ b/vlib/vanilla_http_server/README.md @@ -0,0 +1,74 @@ +vanilla_http_server Logo + +# vanilla_http_server + +- **Fast**: Multi-threaded, non-blocking I/O, lock-free, copy-free, epoll, SO_REUSEPORT. +- **Thread Affinity**: Work in Progress (W.I.P.). +- **Modular**: Compatible with any HTTP parser. +- **Memory Safety**: No race conditions. +- **No Magic**: Transparent and straightforward. +- **E2E Testing**: Allows end-to-end testing and scripting without running the server. + Simply pass the raw request to `handle_request()`. +- **SSE Friendly**: Server-Sent Events support. +- **Graceful Shutdown**: Work in Progress (W.I.P.). + +## Installation + +### From Root Directory + +1. Create the required directories: + +```bash +mkdir -p ~/.vmodules/enghitalo/vanilla +``` + +2. Copy the `vanilla_http_server` directory to the target location: + +```bash +cp -r ./ ~/.vmodules/enghitalo/vanilla +``` + +3. Run the example: + +```bash +v -prod crun examples/simple +``` + +This sets up the module in your `~/.vmodules` directory for use. + +### From Repository + +Install directly from the repository: + +```bash +v install https://github.com/enghitalo/vanilla_http_server +``` + +## Benchmarking + +Run the following commands to benchmark the server: + +1. Test with `curl`: + +```bash +curl -v http://localhost:3001 +``` + +2. Test with `wrk`: + +```bash +wrk -H 'Connection: "keep-alive"' --connection 512 --threads 16 --duration 60s http://localhost:3001 +``` + +Example output: + +```plaintext +Running 1m test @ http://localhost:3001 + 16 threads and 512 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 1.25ms 1.46ms 35.70ms 84.67% + Req/Sec 32.08k 2.47k 57.85k 71.47% + 30662010 requests in 1.00m, 2.68GB read +Requests/sec: 510197.97 +Transfer/sec: 45.74MB +``` diff --git a/vlib/vanilla_http_server/http_server/http_server.c.v b/vlib/vanilla_http_server/http_server/http_server.c.v new file mode 100644 index 0000000000..c601606cd3 --- /dev/null +++ b/vlib/vanilla_http_server/http_server/http_server.c.v @@ -0,0 +1,319 @@ +module http_server + +import runtime + +const max_connection_size = 1024 +const max_thread_pool_size = runtime.nr_cpus() +pub const tiny_bad_request_response = 'HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() +const status_444_response = 'HTTP/1.1 444 No Response\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() +const status_499_response = 'HTTP/1.1 499 Client Closed Request\r\nContent-Length: 0\r\nConnection: close\r\n\r\n'.bytes() + +#include +#include + +$if !windows { + #include + #include +} + +fn C.socket(socket_family int, socket_type int, protocol int) int +fn C.bind(sockfd int, addr &C.sockaddr_in, addrlen u32) int +fn C.send(__fd int, __buf voidptr, __n usize, __flags int) int +fn C.recv(__fd int, __buf voidptr, __n usize, __flags int) int +fn C.setsockopt(__fd int, __level int, __optname int, __optval voidptr, __optlen u32) int +fn C.listen(__fd int, __n int) int +fn C.perror(s &u8) +fn C.close(fd int) int +fn C.accept(sockfd int, address &C.sockaddr_in, addrlen &u32) int +fn C.htons(__hostshort u16) u16 +fn C.epoll_create1(__flags int) int +fn C.epoll_ctl(__epfd int, __op int, __fd int, __event &C.epoll_event) int +fn C.epoll_wait(__epfd int, __events &C.epoll_event, __maxevents int, __timeout int) int +fn C.fcntl(fd int, cmd int, arg int) int + +struct C.in_addr { + s_addr u32 +} + +struct C.sockaddr_in { + sin_family u16 + sin_port u16 + sin_addr C.in_addr + sin_zero [8]u8 +} + +union C.epoll_data { + ptr voidptr + fd int + u32 u32 + u64 u64 +} + +struct C.epoll_event { + events u32 + data C.epoll_data +} + +pub struct Server { +pub: + port int = 3000 +pub mut: + socket_fd int + epoll_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size} + threads []thread = []thread{len: max_thread_pool_size, cap: max_thread_pool_size} + request_handler fn ([]u8, int) ![]u8 @[required] +} + +fn set_blocking(fd int, blocking bool) { + flags := C.fcntl(fd, C.F_GETFL, 0) + if flags == -1 { + eprintln(@LOCATION) + return + } + new_flags := if blocking { flags & ~C.O_NONBLOCK } else { flags | C.O_NONBLOCK } + C.fcntl(fd, C.F_SETFL, new_flags) +} + +fn close_socket(fd int) { + C.close(fd) +} + +fn create_server_socket(port int) int { + server_fd := C.socket(C.AF_INET, C.SOCK_STREAM, 0) + if server_fd < 0 { + eprintln(@LOCATION) + C.perror('Socket creation failed'.str) + exit(1) + } + + set_blocking(server_fd, false) + + opt := 1 + if C.setsockopt(server_fd, C.SOL_SOCKET, C.SO_REUSEPORT, &opt, sizeof(opt)) < 0 { + eprintln(@LOCATION) + C.perror('setsockopt SO_REUSEPORT failed'.str) + close_socket(server_fd) + exit(1) + } + + server_addr := C.sockaddr_in{ + sin_family: u16(C.AF_INET) + sin_port: C.htons(port) + sin_addr: C.in_addr{u32(C.INADDR_ANY)} + sin_zero: [8]u8{} + } + + if C.bind(server_fd, &server_addr, sizeof(server_addr)) < 0 { + eprintln(@LOCATION) + C.perror('Bind failed'.str) + close_socket(server_fd) + exit(1) + } + + if C.listen(server_fd, max_connection_size) < 0 { + eprintln(@LOCATION) + C.perror('Listen failed'.str) + close_socket(server_fd) + exit(1) + } + + return server_fd +} + +fn create_epoll_fd() int { + epoll_fd := C.epoll_create1(0) + if epoll_fd < 0 { + C.perror('epoll_create1'.str) + } + return epoll_fd +} + +fn add_fd_to_epoll(epoll_fd int, fd int, events u32) int { + mut ev := C.epoll_event{ + events: events + } + ev.data.fd = fd + if C.epoll_ctl(epoll_fd, C.EPOLL_CTL_ADD, fd, &ev) == -1 { + eprintln(@LOCATION) + C.perror('epoll_ctl'.str) + return -1 + } + return 0 +} + +fn remove_fd_from_epoll(epoll_fd int, fd int) { + C.epoll_ctl(epoll_fd, C.EPOLL_CTL_DEL, fd, C.NULL) +} + +fn handle_accept_loop(mut server Server, main_epoll_fd int) { + mut next_worker := 0 + mut event := C.epoll_event{} + + for { + num_events := C.epoll_wait(main_epoll_fd, &event, 1, -1) + if num_events < 0 { + if C.errno == C.EINTR { + continue + } + C.perror('epoll_wait'.str) + break + } + + if num_events > 1 { + eprintln('More than one event in epoll_wait, this should not happen.') + continue + } + + if event.events & u32(C.EPOLLIN) != 0 { + for { + client_conn_fd := C.accept(server.socket_fd, C.NULL, C.NULL) + if client_conn_fd < 0 { + // Check for EAGAIN or EWOULDBLOCK, usually represented by errno 11. + if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { + break // No more incoming connections; exit loop. + } + eprintln(@LOCATION) + C.perror('Accept failed'.str) + continue + } + set_blocking(client_conn_fd, false) + // Load balance the client connection to the worker threads. + // this is a simple round-robin approach. + epoll_fd := server.epoll_fds[next_worker] + next_worker = (next_worker + 1) % max_thread_pool_size + if add_fd_to_epoll(epoll_fd, client_conn_fd, u32(C.EPOLLIN | C.EPOLLET)) < 0 { + close_socket(client_conn_fd) + continue + } + } + } + } +} + +@[direct_array_access; manualfree] +fn process_events(mut server Server, epoll_fd int) { + mut events := [max_connection_size]C.epoll_event{} + + for { + num_events := C.epoll_wait(epoll_fd, &events[0], max_connection_size, -1) + if num_events < 0 { + if C.errno == C.EINTR { + continue + } + eprintln(@LOCATION) + C.perror('epoll_wait'.str) + break + } + + for i in 0 .. num_events { + client_conn_fd := unsafe { events[i].data.fd } + if events[i].events & u32(C.EPOLLHUP | C.EPOLLERR) != 0 { + remove_fd_from_epoll(epoll_fd, client_conn_fd) + close_socket(client_conn_fd) + continue + } + + if events[i].events & u32(C.EPOLLIN) != 0 { + mut request_buffer := []u8{} + defer { + unsafe { + request_buffer.free() + } + } + mut temp_buffer := [140]u8{} + for { + bytes_read := C.recv(client_conn_fd, &temp_buffer[0], temp_buffer.len, + 0) + if bytes_read < 0 { + if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { + break // No more data to read + } + eprintln(@LOCATION) + C.perror('recv'.str) + remove_fd_from_epoll(epoll_fd, client_conn_fd) + close_socket(client_conn_fd) + break + } + if bytes_read == 0 { + // Client closed the connection + remove_fd_from_epoll(epoll_fd, client_conn_fd) + close_socket(client_conn_fd) + break + } + unsafe { request_buffer.push_many(&temp_buffer[0], bytes_read) } + if bytes_read < temp_buffer.len { + break // Assume the request is complete + } + } + + if request_buffer.len == 0 { + C.send(client_conn_fd, status_444_response.data, status_444_response.len, + 0) + continue + } + + response_buffer := server.request_handler(request_buffer, client_conn_fd) or { + eprintln('Error handling request: ${err}') + C.send(client_conn_fd, tiny_bad_request_response.data, tiny_bad_request_response.len, + 0) + remove_fd_from_epoll(epoll_fd, client_conn_fd) + close_socket(client_conn_fd) + continue + } + + sent := C.send(client_conn_fd, response_buffer.data, response_buffer.len, + C.MSG_NOSIGNAL | C.MSG_ZEROCOPY) + if sent < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK { + eprintln(@LOCATION) + C.perror('send'.str) + remove_fd_from_epoll(epoll_fd, client_conn_fd) + close_socket(client_conn_fd) + } + } + } + } +} + +// run starts the HTTP server and handles incoming connections. +// This method uses epoll for efficient event-driven I/O handling. +pub fn (mut server Server) run() { + $if windows { + eprintln('Windows is not supported yet. Please, use WSL or Linux.') + exit(1) + } + + server.socket_fd = create_server_socket(server.port) + if server.socket_fd < 0 { + return + } + + main_epoll_fd := create_epoll_fd() + if main_epoll_fd < 0 { + close_socket(server.socket_fd) + exit(1) + } + + if add_fd_to_epoll(main_epoll_fd, server.socket_fd, u32(C.EPOLLIN)) < 0 { + close_socket(server.socket_fd) + close_socket(main_epoll_fd) + exit(1) + } + + for i in 0 .. max_thread_pool_size { + server.epoll_fds[i] = create_epoll_fd() + if server.epoll_fds[i] < 0 { + C.perror('epoll_create1'.str) + for j in 0 .. i { + close_socket(server.epoll_fds[j]) + } + close_socket(main_epoll_fd) + close_socket(server.socket_fd) + exit(1) + } + + server.threads[i] = spawn process_events(mut server, server.epoll_fds[i]) + } + + println('listening on http://localhost:${server.port}/') + handle_accept_loop(mut server, main_epoll_fd) +} diff --git a/vlib/vanilla_http_server/request_parser/request_parser.v b/vlib/vanilla_http_server/request_parser/request_parser.v new file mode 100644 index 0000000000..7c36f45de0 --- /dev/null +++ b/vlib/vanilla_http_server/request_parser/request_parser.v @@ -0,0 +1,120 @@ +module request_parser + +pub struct Slice { +pub: + start int + len int +} + +pub struct HttpRequest { +pub mut: + buffer []u8 + method Slice + path Slice + version Slice +} + +@[direct_array_access] +fn parse_http1_request_line(mut req HttpRequest) ! { + mut i := 0 + // Parse HTTP method + for i < req.buffer.len && req.buffer[i] != ` ` { + i++ + } + req.method = Slice{ + start: 0 + len: i + } + i++ + + // Parse path + mut path_start := i + for i < req.buffer.len && req.buffer[i] != ` ` { + i++ + } + req.path = Slice{ + start: path_start + len: i - path_start + } + i++ + + // Parse HTTP version + mut version_start := i + for i < req.buffer.len && req.buffer[i] != `\r` { + i++ + } + req.version = Slice{ + start: version_start + len: i - version_start + } + + // Move to the end of the request line + if i + 1 < req.buffer.len && req.buffer[i] == `\r` && req.buffer[i + 1] == `\n` { + i += 2 + } else { + return error('Invalid HTTP request line') + } +} + +// decode_http_request decodes an HTTP request from a byte buffer. +// It parses the request line and populates the HttpRequest struct with method, path, and version. +pub fn decode_http_request(buffer []u8) !HttpRequest { + mut req := HttpRequest{ + buffer: buffer + } + + parse_http1_request_line(mut req)! + + return req +} + +// Helper function to convert Slice to string for debugging +pub fn slice_to_string(buffer []u8, s Slice) string { + return buffer[s.start..s.start + s.len].bytestr() +} + +// get_header_value_slice retrieves the value of a header from the HTTP request buffer. +// It searches for the header name in the request buffer and returns its value as a Slice. +@[direct_array_access] +pub fn (req HttpRequest) get_header_value_slice(name string) ?Slice { + mut pos := req.version.start + req.version.len + 2 // Start after request line (CRLF) + if pos >= req.buffer.len { + return none + } + + for pos < req.buffer.len { + if unsafe { + vmemcmp(&req.buffer[pos], name.str, name.len) + } == 0 { + pos += name.len + if req.buffer[pos] != `:` { + return none + } + pos++ + for pos < req.buffer.len && (req.buffer[pos] == ` ` || req.buffer[pos] == `\t`) { + pos++ + } + if pos >= req.buffer.len { + return none + } + mut start := pos + for pos < req.buffer.len && req.buffer[pos] != `\r` { + pos++ + } + return Slice{ + start: start + len: pos - start + } + } + if req.buffer[pos] == `\r` { + pos++ + if pos < req.buffer.len && req.buffer[pos] == `\n` { + pos++ + } + } else { + pos++ + } + } + + return none +} diff --git a/vlib/vanilla_http_server/request_parser/request_parser_test.v b/vlib/vanilla_http_server/request_parser/request_parser_test.v new file mode 100644 index 0000000000..7a06f32fe6 --- /dev/null +++ b/vlib/vanilla_http_server/request_parser/request_parser_test.v @@ -0,0 +1,68 @@ +module request_parser + +fn test_parse_http1_request_line_valid_request() { + buffer := 'GET /path/to/resource HTTP/1.1\r\n'.bytes() + mut req := HttpRequest{ + buffer: buffer + } + + parse_http1_request_line(mut req) or { panic(err) } + + assert slice_to_string(req.buffer, req.method) == 'GET' + assert slice_to_string(req.buffer, req.path) == '/path/to/resource' + assert slice_to_string(req.buffer, req.version) == 'HTTP/1.1' +} + +fn test_parse_http1_request_line_invalid_request() { + buffer := 'INVALID REQUEST LINE'.bytes() + mut req := HttpRequest{ + buffer: buffer + } + + mut has_error := false + parse_http1_request_line(mut req) or { + has_error = true + assert err.msg() == 'Invalid HTTP request line' + } + assert has_error, 'Expected error for invalid request line' +} + +fn test_decode_http_request_valid_request() { + buffer := 'POST /api/resource HTTP/1.0\r\n'.bytes() + req := decode_http_request(buffer) or { panic(err) } + + assert slice_to_string(req.buffer, req.method) == 'POST' + assert slice_to_string(req.buffer, req.path) == '/api/resource' + assert slice_to_string(req.buffer, req.version) == 'HTTP/1.0' +} + +fn test_decode_http_request_invalid_request() { + buffer := 'INVALID REQUEST LINE'.bytes() + + mut has_error := false + decode_http_request(buffer) or { + has_error = true + assert err.msg() == 'Invalid HTTP request line' + } + assert has_error, 'Expected error for invalid request' +} + +fn test_get_header_value_slice_existing_header() { + buffer := 'GET / HTTP/1.1\r\nHost: example.com\r\nContent-Type: text/html\r\n\r\n'.bytes() + req := decode_http_request(buffer) or { panic(err) } + + host_slice := req.get_header_value_slice('Host') or { panic('Header not found') } + assert slice_to_string(req.buffer, host_slice) == 'example.com' + + content_type_slice := req.get_header_value_slice('Content-Type') or { + panic('Header not found') + } + assert slice_to_string(req.buffer, content_type_slice) == 'text/html' +} + +fn test_get_header_value_slice_non_existing_header() { + buffer := 'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n'.bytes() + req := decode_http_request(buffer) or { panic(err) } + + assert req.get_header_value_slice('Content-Type') == none +} diff --git a/vlib/vanilla_http_server/x.v b/vlib/vanilla_http_server/x.v new file mode 100644 index 0000000000..d43dcd5942 --- /dev/null +++ b/vlib/vanilla_http_server/x.v @@ -0,0 +1,3 @@ +module vanilla + +pub const description = 'an empty module, used as a placeholder, for other modules'