Web/reactor/conn.c
2025-09-15 12:57:54 -05:00

300 lines
6.9 KiB
C

#include "conn.h"
#include "pool.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <errno.h>
#include <stdio.h>
// Status text lookup
static const char* status_text(int code) {
switch (code) {
case 200: return "OK";
case 201: return "Created";
case 204: return "No Content";
case 301: return "Moved Permanently";
case 302: return "Found";
case 304: return "Not Modified";
case 400: return "Bad Request";
case 401: return "Unauthorized";
case 403: return "Forbidden";
case 404: return "Not Found";
case 405: return "Method Not Allowed";
case 500: return "Internal Server Error";
case 502: return "Bad Gateway";
case 503: return "Service Unavailable";
default: return "Unknown";
}
}
// conn_new creates a new connection.
conn_t* conn_new(pool_t *pool, size_t rbuf_size, size_t wbuf_size, size_t arena_size) {
conn_t *c = calloc(1, sizeof(conn_t));
if (!c) return NULL;
c->pool = pool;
c->state = CONN_IDLE;
c->rbuf = malloc(rbuf_size);
c->rsize = rbuf_size;
c->wbuf = malloc(wbuf_size);
c->wsize = wbuf_size;
c->arena = arena_new(arena_size);
if (!c->rbuf || !c->wbuf || !c->arena) {
conn_free(c);
return NULL;
}
return c;
}
// conn_free destroys a connection.
void conn_free(conn_t *c) {
if (!c) return;
if (c->fd > 0) close(c->fd);
free(c->rbuf);
free(c->wbuf);
arena_free(c->arena);
free(c);
}
// conn_reset clears connection state for reuse.
void conn_reset(conn_t *c) {
// Preserve fd but reset buffers
c->rlen = 0;
c->rpos = 0;
c->wlen = 0;
c->wpos = 0;
c->method = NULL;
c->path = NULL;
c->version = NULL;
c->headers = NULL;
c->body = NULL;
c->body_len = 0;
c->status_code = 0;
arena_reset(c->arena);
}
// Simple HTTP request line parser
static int parse_request_line(conn_t *c) {
char *p = c->rbuf + c->rpos;
char *end = c->rbuf + c->rlen;
char *line_end = memchr(p, '\n', end - p);
if (!line_end) return 0; // need more data
// Find method
char *method_end = memchr(p, ' ', line_end - p);
if (!method_end) return -1; // bad request
c->method = arena_strdup(c->arena, p, method_end - p);
p = method_end + 1;
// Find path
char *path_end = memchr(p, ' ', line_end - p);
if (!path_end) return -1;
c->path = arena_strdup(c->arena, p, path_end - p);
p = path_end + 1;
// Find version
char *version_end = line_end;
if (version_end > p && *(version_end - 1) == '\r') version_end--;
c->version = arena_strdup(c->arena, p, version_end - p);
// Move past the line
c->rpos = line_end - c->rbuf + 1;
// For now, skip headers and assume no body
// Find blank line
p = c->rbuf + c->rpos;
while (p < end - 1) {
if (*p == '\n' && (p == c->rbuf + c->rpos || *(p-1) == '\r' || *(p-1) == '\n')) {
c->rpos = p - c->rbuf + 1;
return 1; // request complete
}
p++;
}
return 0; // need more data
}
// conn_read_handler handles read events.
void conn_read_handler(runtime_t *rt, int fd, uint32_t events, void *data) {
conn_t *c = (conn_t*)data;
pool_t *p = c->pool;
if (events & (EPOLLHUP | EPOLLERR)) {
conn_close(c);
return;
}
// Read all available data in a loop (edge-triggered)
while (1) {
ssize_t n = read(fd, c->rbuf + c->rlen, c->rsize - c->rlen);
if (n <= 0) {
if (n < 0 && errno == EAGAIN) break;
if (n == 0 || (n < 0 && errno != EAGAIN)) {
conn_close(c);
return;
}
}
c->rlen += n;
// Buffer full, process what we have
if (c->rlen >= c->rsize) break;
}
// Try to parse request - handle multiple requests in buffer
while (c->rpos < c->rlen) {
int status = parse_request_line(c);
if (status < 0) {
// Bad request
conn_write_status(c, 400);
conn_end_response(c);
c->state = CONN_WRITING;
runtime_mod(rt, fd, EPOLLOUT | EPOLLET, conn_write_handler, c);
break;
} else if (status > 0) {
// Request complete - call handler from pool
pool_handle_request(p, c);
c->state = CONN_WRITING;
runtime_mod(rt, fd, EPOLLOUT | EPOLLET, conn_write_handler, c);
break;
} else {
// Need more data
break;
}
}
}
// conn_write_handler handles write events.
void conn_write_handler(runtime_t *rt, int fd, uint32_t events, void *data) {
conn_t *c = (conn_t*)data;
if (events & (EPOLLHUP | EPOLLERR)) {
conn_close(c);
return;
}
// Write buffered data
while (c->wpos < c->wlen) {
ssize_t n = write(fd, c->wbuf + c->wpos, c->wlen - c->wpos);
if (n < 0) {
if (errno == EAGAIN) return;
conn_close(c);
return;
}
c->wpos += n;
}
// Response complete - reset for next request (keep-alive)
conn_reset(c);
c->state = CONN_READING;
runtime_mod(rt, fd, EPOLLIN | EPOLLET, conn_read_handler, c);
}
// conn_accept accepts a new connection.
int conn_accept(pool_t *p, int listen_fd) {
struct sockaddr_in addr;
socklen_t addrlen = sizeof(addr);
int fd = accept(listen_fd, (struct sockaddr*)&addr, &addrlen);
if (fd < 0) return -1;
// Get connection from pool
conn_t *c = pool_get(p);
if (!c) {
close(fd);
return -1;
}
// Setup connection
c->fd = fd;
set_nonblocking(fd);
// Disable Nagle's algorithm for lower latency
int flag = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
// Add to epoll
runtime_add(pool_runtime(p), fd, EPOLLIN | EPOLLET, conn_read_handler, c);
return 0;
}
// conn_close closes a connection and returns it to the pool.
void conn_close(conn_t *c) {
if (c->fd > 0) {
runtime_del(pool_runtime(c->pool), c->fd);
close(c->fd);
c->fd = 0;
}
pool_handle_close(c->pool, c);
pool_put(c->pool, c);
}
// conn_write_status writes the HTTP status line.
int conn_write_status(conn_t *c, int code) {
c->status_code = code;
int n = snprintf(c->wbuf + c->wlen, c->wsize - c->wlen,
"HTTP/1.1 %d %s\r\n", code, status_text(code));
if (n > 0) c->wlen += n;
return n;
}
// conn_write_header writes an HTTP header.
int conn_write_header(conn_t *c, const char *name, const char *value) {
int n = snprintf(c->wbuf + c->wlen, c->wsize - c->wlen,
"%s: %s\r\n", name, value);
if (n > 0) c->wlen += n;
return n;
}
// conn_write_body writes the response body.
int conn_write_body(conn_t *c, const char *data, size_t len) {
// Reserve space for headers and body upfront
size_t header_space = 64; // Enough for Content-Length header
size_t total_needed = header_space + len + 2; // +2 for \r\n
if (c->wlen + total_needed > c->wsize) {
// Not enough space - truncate body
len = c->wsize - c->wlen - header_space - 2;
if (len <= 0) return -1;
}
// Write Content-Length header directly
int n = snprintf(c->wbuf + c->wlen, header_space,
"Content-Length: %zu\r\n\r\n", len);
c->wlen += n;
// Write body directly
memcpy(c->wbuf + c->wlen, data, len);
c->wlen += len;
return len;
}
// conn_end_response finalizes the response.
int conn_end_response(conn_t *c) {
// If no body was written, just end headers
if (c->status_code && c->wlen > 0) {
if (c->wlen + 2 < c->wsize) {
c->wbuf[c->wlen++] = '\r';
c->wbuf[c->wlen++] = '\n';
}
}
return 0;
}