picoev, picohttparser: reimplement in V (#18506)

This commit is contained in:
Casper Kuethe 2023-07-12 08:40:16 +02:00 committed by GitHub
parent 045adb6600
commit a43064af07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 1654 additions and 261 deletions

View file

@ -1,4 +1,4 @@
## Description:
`picoev` is a thin wrapper over [picoev](https://github.com/kazuho/picoev),
`picoev` is a V implementation of [picoev](https://github.com/kazuho/picoev),
which in turn is "A tiny, lightning fast event loop for network applications".

View file

@ -0,0 +1,95 @@
module picoev
$if windows {
#include <winsock2.h>
#include <ws2tcpip.h>
} $else {
#include <sys/select.h>
}
pub struct SelectLoop {
mut:
id int
now i64
}
type LoopType = SelectLoop
// create_select_loop creates a `SelectLoop` struct with `id`
pub fn create_select_loop(id int) !&SelectLoop {
return &SelectLoop{
id: id
}
}
[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds
pv.file_descriptors[fd].events = u32(events & picoev_readwrite)
return 0
}
[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
readfds, writefds, errorfds := C.fd_set{}, C.fd_set{}, C.fd_set{}
// setup
C.FD_ZERO(&readfds)
C.FD_ZERO(&writefds)
C.FD_ZERO(&errorfds)
mut maxfd := 0
// find the maximum socket for `select` and add sockets to the fd_sets
for target in pv.file_descriptors {
if target.loop_id == pv.loop.id {
if target.events & picoev_read != 0 {
C.FD_SET(target.fd, &readfds)
if maxfd < target.fd {
maxfd = target.fd
}
}
if target.events & picoev_write != 0 {
C.FD_SET(target.fd, &writefds)
if maxfd < target.fd {
maxfd = target.fd
}
}
}
}
// select and handle sockets if any
tv := C.timeval{
tv_sec: u64(max_wait)
tv_usec: 0
}
r := C.@select(maxfd + 1, &readfds, &writefds, &errorfds, &tv)
if r == -1 {
// timeout
return -1
} else if r > 0 {
for target in pv.file_descriptors {
if target.loop_id == pv.loop.id {
// vfmt off
read_events := (
(if C.FD_ISSET(target.fd, &readfds) { picoev_read } else { 0 })
|
(if C.FD_ISSET(target.fd, &writefds) { picoev_write } else { 0 })
)
// vfmt on
if read_events != 0 {
$if trace_fd ? {
eprintln('do callback ${target.fd}')
}
// do callback!
unsafe { target.cb(target.fd, read_events, &pv) }
}
}
}
}
return 0
}

View file

@ -0,0 +1,203 @@
module picoev
#include <errno.h>
#include <sys/types.h>
#include <sys/event.h>
fn C.kevent(int, changelist voidptr, nchanges int, eventlist voidptr, nevents int, timout &C.timespec) int
fn C.kqueue() int
fn C.EV_SET(kev voidptr, ident int, filter i16, flags u16, fflags u32, data voidptr, udata voidptr)
pub struct C.kevent {
pub mut:
ident int
// uintptr_t
filter i16
flags u16
fflags u32
data voidptr
// intptr_t
udata voidptr
}
[heap]
pub struct KqueueLoop {
mut:
id int
now i64
kq_id int
// -1 if not changed
changed_fds int
events [1024]C.kevent
changelist [256]C.kevent
}
type LoopType = KqueueLoop
// create_kqueue_loop creates a new kernel event queue with loop_id=`id`
pub fn create_kqueue_loop(id int) !&KqueueLoop {
mut loop := &KqueueLoop{
id: id
}
loop.kq_id = C.kqueue()
if loop.kq_id == -1 {
return error('could not create kqueue loop!')
}
loop.changed_fds = -1
return loop
}
// ev_set sets a new `kevent` with file descriptor `index`
[inline]
pub fn (mut pv Picoev) ev_set(index int, operation int, events int) {
// vfmt off
filter := i16(
(if events & picoev_read != 0 { C.EVFILT_READ } else { 0 })
|
(if events & picoev_write != 0 { C.EVFILT_WRITE } else { 0 })
)
// vfmt on
C.EV_SET(&pv.loop.changelist[index], pv.loop.changed_fds, filter, operation, 0, 0,
0)
}
// backend_build uses the lower 8 bits to store the old events and the higher 8
// bits to store the next file descriptor in `Target.backend`
[inline]
fn backend_build(next_fd int, events u32) int {
return int((u32(next_fd) << 8) | (events & 0xff))
}
// get the lower 8 bits
[inline]
fn backend_get_old_events(backend int) int {
return backend & 0xff
}
// get the higher 8 bits
[inline]
fn backend_get_next_fd(backend int) int {
return backend >> 8
}
// apply pending processes all changes for the file descriptors and updates `loop.changelist`
// if `aplly_all` is `true` the changes are immediately applied
fn (mut pv Picoev) apply_pending_changes(apply_all bool) int {
mut total, mut nevents := 0, 0
for pv.loop.changed_fds != -1 {
mut target := pv.file_descriptors[pv.loop.changed_fds]
old_events := backend_get_old_events(target.backend)
if target.events != old_events {
// events have been changed
if old_events != 0 {
pv.ev_set(total, C.EV_DISABLE, old_events)
total++
}
if target.events != 0 {
pv.ev_set(total, C.EV_ADD | C.EV_ENABLE, int(target.events))
total++
}
// Apply the changes if the total changes exceed the changelist size
if total + 1 >= pv.loop.changelist.len {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL,
0, C.NULL)
assert nevents == 0
total = 0
}
}
pv.loop.changed_fds = backend_get_next_fd(target.backend)
target.backend = -1
}
if apply_all && total != 0 {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL, 0, C.NULL)
assert nevents == 0
total = 0
}
return total
}
[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds
mut target := pv.file_descriptors[fd]
// initialize if adding the fd
if events & picoev_add != 0 {
target.backend = -1
}
// return if nothing to do
if (events == picoev_del && target.backend == -1)
|| (events != picoev_del && events & picoev_readwrite == target.events) {
return 0
}
// add to changed list if not yet being done
if target.backend == -1 {
target.backend = backend_build(pv.loop.changed_fds, target.events)
pv.loop.changed_fds = fd
}
// update events
target.events = u32(events & picoev_readwrite)
// apply immediately if is a DELETE
if events & picoev_del != 0 {
pv.apply_pending_changes(true)
}
return 0
}
[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
ts := C.timespec{
tv_sec: max_wait
tv_nsec: 0
}
mut total, mut nevents := 0, 0
// apply changes later when the callback is called.
total = pv.apply_pending_changes(false)
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, &pv.loop.events, pv.loop.events.len,
&ts)
if nevents == -1 {
// the errors we can only rescue
assert C.errno == C.EACCES || C.errno == C.EFAULT || C.errno == C.EINTR
return -1
}
for i := 0; i < nevents; i++ {
event := pv.loop.events[i]
target := pv.file_descriptors[event.ident]
// changelist errors are fatal
assert event.flags & C.EV_ERROR == 0
if pv.loop.id == target.loop_id && event.filter & (C.EVFILT_READ | C.EVFILT_WRITE) != 0 {
read_events := match int(event.filter) {
C.EVFILT_READ {
picoev_read
}
C.EVFILT_WRITE {
picoev_write
}
else {
0
}
}
// do callback!
unsafe { target.cb(target.fd, read_events, &pv) }
}
}
return 0
}

139
vlib/picoev/loop_linux.c.v Normal file
View file

@ -0,0 +1,139 @@
module picoev
#include <sys/epoll.h>
fn C.epoll_create(int) int
fn C.epoll_wait(int, voidptr, int, int) int
fn C.epoll_ctl(int, int, int, voidptr) int
[typedef]
pub union C.epoll_data_t {
mut:
ptr voidptr
fd int
u32 u32
u64 u64
}
[packed]
pub struct C.epoll_event {
mut:
events u32
data C.epoll_data_t
}
[heap]
pub struct EpollLoop {
mut:
id int
epoll_fd int
events [1024]C.epoll_event
now i64
}
type LoopType = EpollLoop
// create_epoll_loop creates a new epoll instance for and returns an
// `EpollLoop` struct with `id`
pub fn create_epoll_loop(id int) !&EpollLoop {
mut loop := &EpollLoop{
id: id
}
loop.epoll_fd = C.epoll_create(max_fds)
if loop.epoll_fd == -1 {
return error('could not create epoll loop!')
}
return loop
}
[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds
mut target := pv.file_descriptors[fd]
mut ev := C.epoll_event{}
// fd belongs to loop
if events & picoev_del != target.events && target.loop_id != pv.loop.id {
return -1
}
if events & picoev_readwrite == target.events {
return 0
}
// vfmt off
ev.events = u32(
(if events & picoev_read != 0 { C.EPOLLIN } else { 0 })
|
(if events & picoev_write != 0 { C.EPOLLOUT } else { 0 })
)
// vfmt on
ev.data.fd = fd
if events & picoev_del != 0 {
// nothing to do
} else if events & picoev_readwrite == 0 {
// delete the file if it exists
epoll_ret := C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_DEL, fd, &ev)
// check error
assert epoll_ret == 0
} else {
// change settings to 0
mut epoll_ret := C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_MOD, fd, &ev)
if epoll_ret != 0 {
// if the file is not present we want to add it
assert C.errno == C.ENOENT
epoll_ret = C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_ADD, fd, &ev)
// check error
assert epoll_ret == 0
}
}
// convert to u32?
target.events = u32(events)
return 0
}
[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
nevents := C.epoll_wait(pv.loop.epoll_fd, &pv.loop.events, max_fds, max_wait * 1000)
if nevents == -1 {
// timeout has occurred
return -1
}
for i := 0; i < nevents; i++ {
mut event := pv.loop.events[i]
target := unsafe { pv.file_descriptors[event.data.fd] }
unsafe {
assert event.data.fd < max_fds
}
if pv.loop.id == target.loop_id && target.events & picoev_readwrite != 0 {
// vfmt off
read_events := (
(if event.events & u32(C.EPOLLIN) != 0 { picoev_read } else { 0 })
|
(if event.events & u32(C.EPOLLOUT) != 0 { picoev_write } else { 0 })
)
// vfmt on
if read_events != 0 {
// do callback!
unsafe { target.cb(event.data.fd, read_events, &pv) }
}
} else {
// defer epoll delete
event.events = 0
unsafe {
C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_DEL, event.data.fd, &event)
}
}
}
return 0
}

203
vlib/picoev/loop_macos.c.v Normal file
View file

@ -0,0 +1,203 @@
module picoev
#include <errno.h>
#include <sys/types.h>
#include <sys/event.h>
fn C.kevent(int, changelist voidptr, nchanges int, eventlist voidptr, nevents int, timout &C.timespec) int
fn C.kqueue() int
fn C.EV_SET(kev voidptr, ident int, filter i16, flags u16, fflags u32, data voidptr, udata voidptr)
pub struct C.kevent {
pub mut:
ident int
// uintptr_t
filter i16
flags u16
fflags u32
data voidptr
// intptr_t
udata voidptr
}
[heap]
pub struct KqueueLoop {
mut:
id int
now i64
kq_id int
// -1 if not changed
changed_fds int
events [1024]C.kevent
changelist [256]C.kevent
}
type LoopType = KqueueLoop
// create_kqueue_loop creates a new kernel event queue with loop_id=`id`
pub fn create_kqueue_loop(id int) !&KqueueLoop {
mut loop := &KqueueLoop{
id: id
}
loop.kq_id = C.kqueue()
if loop.kq_id == -1 {
return error('could not create kqueue loop!')
}
loop.changed_fds = -1
return loop
}
// ev_set sets a new `kevent` with file descriptor `index`
[inline]
pub fn (mut pv Picoev) ev_set(index int, operation int, events int) {
// vfmt off
filter := i16(
(if events & picoev_read != 0 { C.EVFILT_READ } else { 0 })
|
(if events & picoev_write != 0 { C.EVFILT_WRITE } else { 0 })
)
// vfmt on
C.EV_SET(&pv.loop.changelist[index], pv.loop.changed_fds, filter, operation, 0, 0,
0)
}
// backend_build uses the lower 8 bits to store the old events and the higher 8
// bits to store the next file descriptor in `Target.backend`
[inline]
fn backend_build(next_fd int, events u32) int {
return int((u32(next_fd) << 8) | (events & 0xff))
}
// get the lower 8 bits
[inline]
fn backend_get_old_events(backend int) int {
return backend & 0xff
}
// get the higher 8 bits
[inline]
fn backend_get_next_fd(backend int) int {
return backend >> 8
}
// apply pending processes all changes for the file descriptors and updates `loop.changelist`
// if `aplly_all` is `true` the changes are immediately applied
fn (mut pv Picoev) apply_pending_changes(apply_all bool) int {
mut total, mut nevents := 0, 0
for pv.loop.changed_fds != -1 {
mut target := pv.file_descriptors[pv.loop.changed_fds]
old_events := backend_get_old_events(target.backend)
if target.events != old_events {
// events have been changed
if old_events != 0 {
pv.ev_set(total, C.EV_DISABLE, old_events)
total++
}
if target.events != 0 {
pv.ev_set(total, C.EV_ADD | C.EV_ENABLE, int(target.events))
total++
}
// Apply the changes if the total changes exceed the changelist size
if total + 1 >= pv.loop.changelist.len {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL,
0, C.NULL)
assert nevents == 0
total = 0
}
}
pv.loop.changed_fds = backend_get_next_fd(target.backend)
target.backend = -1
}
if apply_all && total != 0 {
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, C.NULL, 0, C.NULL)
assert nevents == 0
total = 0
}
return total
}
[direct_array_access]
fn (mut pv Picoev) update_events(fd int, events int) int {
// check if fd is in range
assert fd < max_fds
mut target := pv.file_descriptors[fd]
// initialize if adding the fd
if events & picoev_add != 0 {
target.backend = -1
}
// return if nothing to do
if (events == picoev_del && target.backend == -1)
|| (events != picoev_del && events & picoev_readwrite == target.events) {
return 0
}
// add to changed list if not yet being done
if target.backend == -1 {
target.backend = backend_build(pv.loop.changed_fds, target.events)
pv.loop.changed_fds = fd
}
// update events
target.events = u32(events & picoev_readwrite)
// apply immediately if is a DELETE
if events & picoev_del != 0 {
pv.apply_pending_changes(true)
}
return 0
}
[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait int) int {
ts := C.timespec{
tv_sec: max_wait
tv_nsec: 0
}
mut total, mut nevents := 0, 0
// apply changes later when the callback is called.
total = pv.apply_pending_changes(false)
nevents = C.kevent(pv.loop.kq_id, &pv.loop.changelist, total, &pv.loop.events, pv.loop.events.len,
&ts)
if nevents == -1 {
// the errors we can only rescue
assert C.errno == C.EACCES || C.errno == C.EFAULT || C.errno == C.EINTR
return -1
}
for i := 0; i < nevents; i++ {
event := pv.loop.events[i]
target := pv.file_descriptors[event.ident]
// changelist errors are fatal
assert event.flags & C.EV_ERROR == 0
if pv.loop.id == target.loop_id && event.filter & (C.EVFILT_READ | C.EVFILT_WRITE) != 0 {
read_events := match int(event.filter) {
C.EVFILT_READ {
picoev_read
}
C.EVFILT_WRITE {
picoev_write
}
else {
0
}
}
// do callback!
unsafe { target.cb(target.fd, read_events, &pv) }
}
}
return 0
}

View file

@ -1,53 +1,31 @@
// Copyright (c) 2019-2023 Alexander Medvednikov. All rights reserved.
// Use of this source code is governed by an MIT license
// that can be found in the LICENSE file.
module picoev
import net
import picohttpparser
import time
#include <errno.h>
#include <netinet/tcp.h>
#include <signal.h>
#flag -I @VEXEROOT/thirdparty/picoev
#flag @VEXEROOT/thirdparty/picoev/picoev.o
#include "src/picoev.h"
pub const (
max_fds = 1024
max_queue = 4096
[typedef]
struct C.picoev_loop {}
fn C.picoev_del(&C.picoev_loop, int) int
fn C.picoev_set_timeout(&C.picoev_loop, int, int)
// fn C.picoev_handler(loop &C.picoev_loop, fd int, revents int, cb_arg voidptr)
// TODO: (sponge) update to C.picoev_handler with C type def update
type PicoevHandler = fn (loop &C.picoev_loop, fd int, revents int, context voidptr)
fn C.picoev_add(&C.picoev_loop, int, int, int, &PicoevHandler, voidptr) int
fn C.picoev_init(int) int
fn C.picoev_create_loop(int) &C.picoev_loop
fn C.picoev_loop_once(&C.picoev_loop, int) int
fn C.picoev_destroy_loop(&C.picoev_loop) int
fn C.picoev_deinit() int
const (
max_fds = 1024
max_timeout = 10
// events
picoev_read = 1
picoev_write = 2
picoev_timeout = 4
picoev_add = 0x40000000
picoev_del = 0x20000000
picoev_readwrite = 3 // 1 xor 2
)
enum Event {
read = C.PICOEV_READ
write = C.PICOEV_WRITE
timeout = C.PICOEV_TIMEOUT
add = C.PICOEV_ADD
del = C.PICOEV_DEL
readwrite = C.PICOEV_READWRITE
// Target is a data representation of everything that needs to be associated with a single
// file descriptor (connection)
pub struct Target {
pub mut:
fd int
loop_id int = -1
events u32
cb fn (int, int, voidptr)
// used internally by the kqueue implementation
backend int
}
pub struct Config {
@ -62,137 +40,230 @@ pub:
max_write int = 8192
}
struct Picoev {
loop &C.picoev_loop = unsafe { nil }
cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response)
err_cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response, IError)
user_data voidptr
[heap]
pub struct Picoev {
cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response)
err_cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response, IError) = default_err_cb
user_data voidptr = unsafe { nil }
timeout_secs int
max_headers int
max_read int
max_write int
max_headers int = 100
max_read int = 4096
max_write int = 8192
mut:
date &u8 = unsafe { nil }
buf &u8 = unsafe { nil }
idx [1024]int
out &u8 = unsafe { nil }
loop &LoopType = unsafe { nil }
file_descriptors [max_fds]&Target
timeouts map[int]i64
num_loops int
buf &u8 = unsafe { nil }
idx [1024]int
out &u8 = unsafe { nil }
date string
}
[inline]
fn setup_sock(fd int) ! {
flag := 1
if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 {
return error('setup_sock.setup_sock failed')
// init fills the `file_descriptors` array
pub fn (mut pv Picoev) init() {
assert picoev.max_fds > 0
pv.num_loops = 0
for i in 0 .. picoev.max_fds {
pv.file_descriptors[i] = &Target{}
}
$if freebsd {
if C.fcntl(fd, C.F_SETFL, C.SOCK_NONBLOCK) != 0 {
return error('fcntl failed')
}
} $else {
if C.fcntl(fd, C.F_SETFL, C.O_NONBLOCK) != 0 {
return error('fcntl failed')
}
// add adds a file descriptor to the loop
[direct_array_access]
pub fn (mut pv Picoev) add(fd int, events int, timeout int, cb voidptr) int {
assert fd < picoev.max_fds
mut target := pv.file_descriptors[fd]
target.fd = fd
target.cb = cb
target.loop_id = pv.loop.id
target.events = 0
if pv.update_events(fd, events | picoev.picoev_add) != 0 {
pv.del(fd)
return -1
}
// update timeout
pv.set_timeout(fd, timeout)
return 0
}
// del removes a file descriptor from the loop
[direct_array_access]
fn (mut pv Picoev) del(fd int) int {
assert fd < picoev.max_fds
mut target := pv.file_descriptors[fd]
$if trace_fd ? {
eprintln('delete ${fd}')
}
if pv.update_events(fd, picoev.picoev_del) != 0 {
target.loop_id = -1
target.fd = 0
return -1
}
pv.set_timeout(fd, 0)
target.loop_id = -1
target.fd = 0
return 0
}
fn (mut pv Picoev) loop_once(max_wait int) int {
pv.loop.now = get_time()
if pv.poll_once(max_wait) != 0 {
return -1
}
if max_wait != 0 {
pv.loop.now = get_time()
}
pv.handle_timeout()
return 0
}
// set_timeout sets the timeout in seconds for a file descriptor. If a timeout occurs
// the file descriptors target callback is called with a timeout event
[direct_array_access; inline]
fn (mut pv Picoev) set_timeout(fd int, secs int) {
assert fd < picoev.max_fds
if secs != 0 {
pv.timeouts[fd] = pv.loop.now + secs
} else {
pv.timeouts.delete(fd)
}
}
// handle_timeout loops over all file descriptors and removes them from the loop
// if they are timed out. Also the file descriptors target callback is called with a
// timeout event
[direct_array_access; inline]
fn (mut pv Picoev) handle_timeout() {
for fd, timeout in pv.timeouts {
if timeout <= pv.loop.now {
target := pv.file_descriptors[fd]
assert target.loop_id == pv.loop.id
pv.timeouts.delete(fd)
unsafe { target.cb(fd, picoev.picoev_timeout, &pv) }
}
}
}
[inline]
fn close_conn(loop &C.picoev_loop, fd int) {
C.picoev_del(voidptr(loop), fd)
C.close(fd)
}
[inline]
fn req_read(fd int, b &u8, max_len int, idx int) int {
unsafe {
return C.read(fd, b + idx, max_len - idx)
}
}
fn rw_callback(loop &C.picoev_loop, fd int, events int, context voidptr) {
mut p := unsafe { &Picoev(context) }
defer {
p.idx[fd] = 0
}
if (events & int(Event.timeout)) != 0 {
close_conn(loop, fd)
// accept_callback accepts a new connection from `listen_fd` and adds it to the loop
fn accept_callback(listen_fd int, events int, cb_arg voidptr) {
mut pv := unsafe { &Picoev(cb_arg) }
newfd := accept(listen_fd)
if newfd >= picoev.max_fds {
// should never happen
close_socket(newfd)
return
} else if (events & int(Event.read)) != 0 {
C.picoev_set_timeout(voidptr(loop), fd, p.timeout_secs)
}
// Request init
mut buf := p.buf
$if trace_fd ? {
eprintln('accept ${newfd}')
}
if newfd != -1 {
setup_sock(newfd) or {
eprintln('setup_sock failed, fd: ${newfd}, listen_fd: ${listen_fd}, err: ${err.code()}')
pv.err_cb(pv.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err)
return
}
pv.add(newfd, picoev.picoev_read, pv.timeout_secs, raw_callback)
}
}
// close_conn closes the socket `fd` and removes it from the loop
[inline]
pub fn (mut pv Picoev) close_conn(fd int) {
pv.del(fd)
close_socket(fd)
}
[direct_array_access]
fn raw_callback(fd int, events int, context voidptr) {
mut pv := unsafe { &Picoev(context) }
defer {
pv.idx[fd] = 0
}
if events & picoev.picoev_timeout != 0 {
$if trace_fd ? {
eprintln('timeout ${fd}')
}
pv.close_conn(fd)
return
} else if events & picoev.picoev_read != 0 {
pv.set_timeout(fd, pv.timeout_secs)
mut buf := pv.buf
unsafe {
buf += fd * p.max_read // pointer magic
buf += fd * pv.max_read // pointer magic
}
mut req := picohttpparser.Request{}
// Response init
mut out := p.out
mut out := pv.out
unsafe {
out += fd * p.max_write // pointer magic
out += fd * pv.max_write // pointer magic
}
mut res := picohttpparser.Response{
fd: fd
date: p.date
buf_start: out
buf: out
date: pv.date.str
}
for {
// Request parsing loop
r := req_read(fd, buf, p.max_read, p.idx[fd]) // Get data from socket
r := req_read(fd, buf, pv.max_read, pv.idx[fd]) // Get data from socket
if r == 0 {
// connection closed by peer
close_conn(loop, fd)
pv.close_conn(fd)
return
} else if r == -1 {
// error
if C.errno == C.EAGAIN {
// try again later
return
}
if C.errno == C.EWOULDBLOCK {
// try again later
if fatal_socket_error(fd) == false {
return
}
// fatal error
close_conn(loop, fd)
pv.close_conn(fd)
return
}
p.idx[fd] += r
pv.idx[fd] += r
mut s := unsafe { tos(buf, p.idx[fd]) }
pret := req.parse_request(s, p.max_headers) // Parse request via picohttpparser
mut s := unsafe { tos(buf, pv.idx[fd]) }
pret := req.parse_request(s) or {
// Parse error
pv.err_cb(pv.user_data, req, mut &res, err)
return
}
if pret > 0 { // Success
break
} else if pret == -1 { // Parse error
p.err_cb(p.user_data, req, mut &res, error('ParseError'))
return
}
assert pret == -2
// request is incomplete, continue the loop
if p.idx[fd] == sizeof(buf) {
p.err_cb(p.user_data, req, mut &res, error('RequestIsTooLongError'))
if pv.idx[fd] == sizeof(buf) {
pv.err_cb(pv.user_data, req, mut &res, error('RequestIsTooLongError'))
return
}
}
// Callback (should call .end() itself)
p.cb(p.user_data, req, mut &res)
}
}
fn accept_callback(loop &C.picoev_loop, fd int, events int, cb_arg voidptr) {
mut p := unsafe { &Picoev(cb_arg) }
newfd := C.accept(fd, 0, 0)
if newfd != -1 {
setup_sock(newfd) or {
p.err_cb(p.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err)
}
C.picoev_add(voidptr(loop), newfd, int(Event.read), p.timeout_secs, rw_callback,
cb_arg)
pv.cb(pv.user_data, req, mut &res)
}
}
@ -201,42 +272,12 @@ fn default_err_cb(data voidptr, req picohttpparser.Request, mut res picohttppars
res.end()
}
// new creates a `Picoev` struct and initializes the main loop
pub fn new(config Config) &Picoev {
fd := C.socket(net.AddrFamily.ip, net.SocketType.tcp, 0)
assert fd != -1
listen_fd := listen(config)
// Setting flags for socket
flag := 1
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)) == 0
$if linux {
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)) == 0
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_QUICKACK, &flag, sizeof(int)) == 0
timeout := 10
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_DEFER_ACCEPT, &timeout, sizeof(int)) == 0
queue_len := 4096
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_FASTOPEN, &queue_len, sizeof(int)) == 0
}
// Setting addr
mut addr := C.sockaddr_in{
sin_family: u8(C.AF_INET)
sin_port: C.htons(config.port)
sin_addr: C.htonl(C.INADDR_ANY)
}
size := sizeof(C.sockaddr_in)
bind_res := C.bind(fd, voidptr(unsafe { &net.Addr(&addr) }), size)
assert bind_res == 0
listen_res := C.listen(fd, C.SOMAXCONN)
assert listen_res == 0
setup_sock(fd) or {
config.err_cb(config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err)
}
C.picoev_init(picoev.max_fds)
loop := C.picoev_create_loop(picoev.max_timeout)
mut pv := &Picoev{
loop: loop
num_loops: 1
cb: config.cb
err_cb: config.err_cb
user_data: config.user_data
@ -244,25 +285,45 @@ pub fn new(config Config) &Picoev {
max_headers: config.max_headers
max_read: config.max_read
max_write: config.max_write
date: &u8(C.get_date())
buf: unsafe { malloc_noscan(picoev.max_fds * config.max_read + 1) }
out: unsafe { malloc_noscan(picoev.max_fds * config.max_write + 1) }
}
C.picoev_add(voidptr(loop), fd, int(Event.read), 0, accept_callback, pv)
spawn update_date(mut pv)
// epoll for linux
// kqueue for macos and bsd
// select for windows and others
$if linux {
pv.loop = create_epoll_loop(0) or { panic(err) }
} $else $if freebsd || macos {
pv.loop = create_kqueue_loop(0) or { panic(err) }
} $else {
pv.loop = create_select_loop(0) or { panic(err) }
}
pv.init()
pv.add(listen_fd, picoev.picoev_read, 0, accept_callback)
return pv
}
pub fn (p Picoev) serve() {
// serve starts the Picoev server
pub fn (mut pv Picoev) serve() {
spawn update_date(mut pv)
for {
C.picoev_loop_once(p.loop, 1)
pv.loop_once(1)
}
}
fn update_date(mut p Picoev) {
// update_date updates `date` on `pv` every second.
fn update_date(mut pv Picoev) {
for {
p.date = &u8(C.get_date())
C.usleep(1000000)
// get GMT (UTC) time for the HTTP Date header
gmt := time.utc()
mut date := gmt.strftime('---, %d --- %Y %H:%M:%S GMT')
date = date.replace_once('---', gmt.weekday_str())
date = date.replace_once('---', gmt.smonth())
pv.date = date
time.sleep(time.second)
}
}

143
vlib/picoev/socket_util.c.v Normal file
View file

@ -0,0 +1,143 @@
module picoev
import net
import picohttpparser
#include <errno.h>
$if windows {
#include <winsock2.h>
#include <ws2tcpip.h>
} $else $if freebsd || macos {
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
} $else {
#include <netinet/tcp.h>
#include <sys/resource.h>
}
[inline]
fn get_time() i64 {
// time.now() is slow
return i64(C.time(C.NULL))
}
[inline]
fn accept(fd int) int {
return C.accept(fd, 0, 0)
}
[inline]
fn close_socket(fd int) {
$if trace_fd ? {
eprintln('close ${fd}')
}
$if windows {
C.closesocket(fd)
} $else {
C.close(fd)
}
}
[inline]
fn setup_sock(fd int) ! {
flag := 1
if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 {
return error('setup_sock.setup_sock failed')
}
$if freebsd {
if C.fcntl(fd, C.F_SETFL, C.SOCK_NONBLOCK) != 0 {
return error('fcntl failed')
}
} $else $if windows {
non_blocking_mode := u32(1)
if C.ioctlsocket(fd, C.FIONBIO, &non_blocking_mode) == C.SOCKET_ERROR {
return error('icotlsocket failed')
}
} $else {
// linux and macos
if C.fcntl(fd, C.F_SETFL, C.O_NONBLOCK) != 0 {
return error('fcntl failed')
}
}
}
[inline]
fn req_read(fd int, b &u8, max_len int, idx int) int {
// use `recv` instead of `read` for windows compatibility
unsafe {
return C.recv(fd, b + idx, max_len - idx, 0)
}
}
fn fatal_socket_error(fd int) bool {
if C.errno == C.EAGAIN {
// try again later
return false
}
$if windows {
if C.errno == C.WSAEWOULDBLOCK {
// try again later
return false
}
} $else {
if C.errno == C.EWOULDBLOCK {
// try again later
return false
}
}
$if trace_fd ? {
eprintln('fatal error ${fd}: ${C.errno}')
}
return true
}
// listen creates a listening tcp socket and returns its file decriptor
fn listen(config Config) int {
// not using the `net` modules sockets, because not all socket options are defined
fd := C.socket(net.AddrFamily.ip, net.SocketType.tcp, 0)
assert fd != -1
$if trace_fd ? {
eprintln('listen: ${fd}')
}
// Setting flags for socket
flag := 1
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEADDR, &flag, sizeof(int)) == 0
$if linux {
// epoll socket options
assert C.setsockopt(fd, C.SOL_SOCKET, C.SO_REUSEPORT, &flag, sizeof(int)) == 0
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_QUICKACK, &flag, sizeof(int)) == 0
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_DEFER_ACCEPT, &config.timeout_secs,
sizeof(int)) == 0
queue_len := max_queue
assert C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_FASTOPEN, &queue_len, sizeof(int)) == 0
}
// addr settings
mut addr := C.sockaddr_in{
sin_family: u8(C.AF_INET)
sin_port: C.htons(config.port)
sin_addr: C.htonl(C.INADDR_ANY)
}
size := sizeof(C.sockaddr_in)
bind_res := C.bind(fd, voidptr(unsafe { &net.Addr(&addr) }), size)
assert bind_res == 0
listen_res := C.listen(fd, C.SOMAXCONN)
assert listen_res == 0
setup_sock(fd) or {
config.err_cb(config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err)
}
return fd
}