Web/app.c

443 lines
11 KiB
C

#define _GNU_SOURCE
#include "app.h"
#include "reactor/pool.h"
#include "reactor/epoll.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sched.h>
#include <signal.h>
#define DEFAULT_MAX_CONNS 10000
#define DEFAULT_MAX_EVENTS 4096
// Default 404 handler
static void default_not_found(context_t *ctx) {
// send_text(ctx, 404, "404 Not Found");
}
// Accept handler for new connections
static void accept_handler(runtime_t *rt, int fd, uint32_t events, void *data) {
app_t *a = (app_t*)data;
// Accept multiple connections in a loop for edge-triggered mode
// Limit to prevent starvation of other events
int accepted = 0;
const int max_accept = 64;
while (accepted < max_accept) {
int ret = conn_accept(a->pool, fd);
if (ret < 0) break;
accepted++;
}
}
// Global app pointer (temporary solution)
static app_t *g_app = NULL;
// Wrapper to convert between handler types
static void handler_wrapper(conn_t *c) {
app_handler_t app_handler = (app_handler_t)c->user_data;
if (app_handler) {
context_t *ctx = context_new(c);
if (ctx) {
app_handler(ctx);
context_free(ctx);
}
}
}
// Request router
static void route_request(conn_t *c) {
app_t *a = g_app; // Use global for now
if (!a || !a->router) return;
// Use the router to find handler
char **params = NULL;
char **names = NULL;
int count = 0;
handler_t handler = router_lookup(a->router, c->method, c->path, &params, &names, &count);
if (handler) {
// The handler is actually an app_handler_t cast to handler_t
// Create context and call it
context_t *ctx = context_new(c);
if (ctx) {
// Store params in context (we'd need to add this to context)
// For now, just call the handler
((app_handler_t)handler)(ctx);
context_free(ctx);
}
} else {
// Create a context and call not_found
context_t *ctx = context_new(c);
if (ctx) {
a->not_found(ctx);
context_free(ctx);
}
}
}
// app_new creates a new web application.
app_t* app_new(void) {
app_t *a = calloc(1, sizeof(app_t));
if (!a) return NULL;
// Default configuration
a->max_conns = DEFAULT_MAX_CONNS;
a->read_buf_size = 8192;
a->write_buf_size = 8192;
a->arena_size = 4096;
a->not_found = default_not_found;
// Default to single process mode
a->num_workers = 0;
a->cpu_affinity = 1;
a->worker_pids = NULL;
a->worker_id = -1;
// Create router
a->router = router_new();
if (!a->router) {
free(a);
return NULL;
}
// Store global reference (temporary solution)
g_app = a;
return a;
}
// app_free destroys the application.
void app_free(app_t *a) {
if (!a) return;
// Free router
if (a->router) {
router_free(a->router);
}
// Close listener
if (a->listen_fd > 0) {
runtime_del(a->rt, a->listen_fd);
close(a->listen_fd);
}
// Free pool and runtime
pool_free(a->pool);
runtime_free(a->rt);
free(a);
}
// app_listen sets up the listening socket.
int app_listen(app_t *a, int port) {
// Don't create runtime/pool yet if using workers
// They need to be created after fork() for proper isolation
if (a->num_workers == 0) {
// Single process mode - create runtime and pool now
if (!a->rt) {
a->rt = runtime_new(DEFAULT_MAX_EVENTS);
if (!a->rt) return -1;
}
if (!a->pool) {
a->pool = pool_new(a->rt, a->max_conns);
if (!a->pool) return -1;
// Configure pool
a->pool->read_buf_size = a->read_buf_size;
a->pool->write_buf_size = a->write_buf_size;
a->pool->arena_size = a->arena_size;
// Set router as request handler
a->pool->on_request = route_request;
}
}
// Create socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) return -1;
// Allow reuse
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
// Enable SO_REUSEPORT for better load distribution across CPU cores
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
// Disable Nagle's algorithm on listener
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
// Bind
struct sockaddr_in addr = {
.sin_family = AF_INET,
.sin_port = htons(port),
.sin_addr.s_addr = INADDR_ANY
};
if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(fd);
return -1;
}
// Listen with larger backlog for better performance
if (listen(fd, 4096) < 0) {
close(fd);
return -1;
}
// Make non-blocking
set_nonblocking(fd);
// Add to epoll only if single process mode
if (a->num_workers == 0 && a->rt) {
runtime_add(a->rt, fd, EPOLLIN | EPOLLET, accept_handler, a);
}
a->listen_fd = fd;
a->port = port;
if (a->num_workers == 0) {
printf("Server listening on :%d\n", port);
}
return 0;
}
// run_worker runs a single worker process
static int run_worker(app_t *a, int worker_id) {
a->worker_id = worker_id;
// Pin to CPU if requested
if (a->cpu_affinity) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(worker_id % sysconf(_SC_NPROCESSORS_ONLN), &cpuset);
sched_setaffinity(0, sizeof(cpuset), &cpuset);
}
// Create runtime and pool for this worker
a->rt = runtime_new(DEFAULT_MAX_EVENTS);
if (!a->rt) return -1;
a->pool = pool_new(a->rt, a->max_conns);
if (!a->pool) return -1;
// Configure pool
a->pool->read_buf_size = a->read_buf_size;
a->pool->write_buf_size = a->write_buf_size;
a->pool->arena_size = a->arena_size;
a->pool->on_request = route_request;
// Add listen socket to this worker's epoll
runtime_add(a->rt, a->listen_fd, EPOLLIN | EPOLLET, accept_handler, a);
printf("Worker %d: Started on CPU %ld, listening on :%d\n", worker_id,
worker_id % sysconf(_SC_NPROCESSORS_ONLN), a->port);
return runtime_run(a->rt);
}
// app_run starts the event loop.
int app_run(app_t *a) {
if (a->listen_fd <= 0) {
fprintf(stderr, "Error: Must call app_listen() before app_run()\n");
return -1;
}
// Multi-worker mode
if (a->num_workers > 0) {
a->worker_pids = calloc(a->num_workers, sizeof(pid_t));
if (!a->worker_pids) return -1;
printf("Starting %d worker processes\n", a->num_workers);
for (int i = 0; i < a->num_workers; i++) {
pid_t pid = fork();
if (pid == 0) {
// Child process
free(a->worker_pids); // Don't need this in child
a->worker_pids = NULL;
return run_worker(a, i);
} else if (pid > 0) {
// Parent process
a->worker_pids[i] = pid;
} else {
perror("fork");
// Kill already started workers
for (int j = 0; j < i; j++) {
if (a->worker_pids[j] > 0) {
kill(a->worker_pids[j], SIGTERM);
}
}
free(a->worker_pids);
return -1;
}
}
// Parent process - wait for workers
printf("All workers started. Running...\n");
// Wait for any child to exit
int status;
pid_t pid;
while ((pid = wait(&status)) > 0) {
printf("Worker process %d exited with status %d\n", pid, WEXITSTATUS(status));
// Find and restart the worker
for (int i = 0; i < a->num_workers; i++) {
if (a->worker_pids[i] == pid) {
printf("Restarting worker %d\n", i);
pid_t new_pid = fork();
if (new_pid == 0) {
// New worker process
free(a->worker_pids);
a->worker_pids = NULL;
return run_worker(a, i);
} else if (new_pid > 0) {
a->worker_pids[i] = new_pid;
}
break;
}
}
}
free(a->worker_pids);
return 0;
}
// Single process mode
printf("Running in single-process mode\n");
a->pool->on_request = route_request;
return runtime_run(a->rt);
}
// app_route registers a route.
void app_route(app_t *a, const char *method, const char *path, app_handler_t handler) {
if (!a || !a->router) return;
// For now, we need to bridge between app_handler_t and handler_t
// The router expects handler_t (conn_t*) but we have app_handler_t (context_t*)
// We'll need to wrap it somehow - for now just cast (not ideal)
router_add(a->router, method, path, (handler_t)handler);
}
// Convenience route methods
void app_get(app_t *a, const char *path, app_handler_t handler) {
app_route(a, "GET", path, handler);
}
void app_post(app_t *a, const char *path, app_handler_t handler) {
app_route(a, "POST", path, handler);
}
void app_put(app_t *a, const char *path, app_handler_t handler) {
app_route(a, "PUT", path, handler);
}
void app_delete(app_t *a, const char *path, app_handler_t handler) {
app_route(a, "DELETE", path, handler);
}
// Configuration setters
void app_set_max_conns(app_t *a, int max) {
a->max_conns = max;
}
void app_set_buffer_sizes(app_t *a, size_t read_size, size_t write_size) {
a->read_buf_size = read_size;
a->write_buf_size = write_size;
}
void app_set_arena_size(app_t *a, size_t size) {
a->arena_size = size;
}
void app_set_not_found(app_t *a, app_handler_t handler) {
a->not_found = handler;
}
// Worker configuration
void app_set_workers(app_t *a, int num_workers) {
a->num_workers = num_workers;
}
void app_set_cpu_affinity(app_t *a, int enabled) {
a->cpu_affinity = enabled;
}
// Helper response functions
static void send_text(conn_t *c, int status, const char *text) {
conn_write_status(c, status);
conn_write_header(c, "Content-Type", "text/plain");
conn_write_body(c, text, strlen(text));
}
void send_json(conn_t *c, int status, const char *json) {
conn_write_status(c, status);
conn_write_header(c, "Content-Type", "application/json");
conn_write_body(c, json, strlen(json));
}
void send_html(conn_t *c, int status, const char *html) {
conn_write_status(c, status);
conn_write_header(c, "Content-Type", "text/html");
conn_write_body(c, html, strlen(html));
}
void send_file(conn_t *c, const char *path) {
// Simple file serving (no caching, no range support)
FILE *f = fopen(path, "rb");
if (!f) {
send_text(c, 404, "File not found");
return;
}
// Get file size
fseek(f, 0, SEEK_END);
size_t size = ftell(f);
fseek(f, 0, SEEK_SET);
// Determine content type from extension
const char *content_type = "application/octet-stream";
const char *ext = strrchr(path, '.');
if (ext) {
if (strcmp(ext, ".html") == 0) content_type = "text/html";
else if (strcmp(ext, ".css") == 0) content_type = "text/css";
else if (strcmp(ext, ".js") == 0) content_type = "application/javascript";
else if (strcmp(ext, ".json") == 0) content_type = "application/json";
else if (strcmp(ext, ".png") == 0) content_type = "image/png";
else if (strcmp(ext, ".jpg") == 0 || strcmp(ext, ".jpeg") == 0) content_type = "image/jpeg";
else if (strcmp(ext, ".gif") == 0) content_type = "image/gif";
else if (strcmp(ext, ".svg") == 0) content_type = "image/svg+xml";
}
// Send headers
conn_write_status(c, 200);
conn_write_header(c, "Content-Type", content_type);
// For small files, read and send directly
if (size < c->wsize - c->wlen - 100) {
char *buf = malloc(size);
if (buf) {
fread(buf, 1, size, f);
conn_write_body(c, buf, size);
free(buf);
}
} else {
// File too large for buffer
conn_write_status(c, 500);
conn_end_response(c);
}
fclose(f);
}