451 lines
11 KiB
C
451 lines
11 KiB
C
#define _GNU_SOURCE
|
|
#include "app.h"
|
|
#include "reactor/pool.h"
|
|
#include "reactor/epoll.h"
|
|
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <unistd.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <netinet/tcp.h>
|
|
#include <fcntl.h>
|
|
#include <sys/wait.h>
|
|
#include <sched.h>
|
|
#include <signal.h>
|
|
|
|
#define DEFAULT_MAX_CONNS 10000
|
|
#define DEFAULT_MAX_EVENTS 4096
|
|
|
|
// Global app instance for simplified macro API
|
|
app_t *_global_app = NULL;
|
|
|
|
// Global route list for inline route definitions (type defined in app_macros.h)
|
|
void *_route_list = NULL;
|
|
|
|
// Default 404 handler
|
|
static void default_not_found(context_t *ctx) {
|
|
// send_text(ctx, 404, "404 Not Found");
|
|
}
|
|
|
|
// Accept handler for new connections
|
|
static void accept_handler(runtime_t *rt, int fd, uint32_t events, void *data) {
|
|
app_t *a = (app_t*)data;
|
|
|
|
// Accept multiple connections in a loop for edge-triggered mode
|
|
// Limit to prevent starvation of other events
|
|
int accepted = 0;
|
|
const int max_accept = 64;
|
|
|
|
while (accepted < max_accept) {
|
|
int ret = conn_accept(a->pool, fd);
|
|
if (ret < 0) break;
|
|
accepted++;
|
|
}
|
|
}
|
|
|
|
// Global app pointer (temporary solution)
|
|
static app_t *g_app = NULL;
|
|
|
|
// Wrapper to convert between handler types
|
|
static void handler_wrapper(conn_t *c) {
|
|
app_handler_t app_handler = (app_handler_t)c->user_data;
|
|
if (app_handler) {
|
|
context_t *ctx = context_new(c);
|
|
if (ctx) {
|
|
app_handler(ctx);
|
|
context_free(ctx);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Request router
|
|
static void route_request(conn_t *c) {
|
|
app_t *a = g_app; // Use global for now
|
|
if (!a || !a->router) return;
|
|
|
|
// Use the router to find handler
|
|
char **params = NULL;
|
|
char **names = NULL;
|
|
int count = 0;
|
|
handler_t handler = router_lookup(a->router, c->method, c->path, ¶ms, &names, &count);
|
|
|
|
if (handler) {
|
|
// The handler is actually an app_handler_t cast to handler_t
|
|
// Create context and call it
|
|
context_t *ctx = context_new(c);
|
|
if (ctx) {
|
|
// Set route parameters on the request
|
|
if (count > 0 && params && names) {
|
|
request_set_params(ctx->request, names, params, count);
|
|
}
|
|
((app_handler_t)handler)(ctx);
|
|
context_free(ctx);
|
|
}
|
|
} else {
|
|
// Create a context and call not_found
|
|
context_t *ctx = context_new(c);
|
|
if (ctx) {
|
|
a->not_found(ctx);
|
|
context_free(ctx);
|
|
}
|
|
}
|
|
}
|
|
|
|
// app_new creates a new web application.
|
|
app_t* app_new(void) {
|
|
app_t *a = calloc(1, sizeof(app_t));
|
|
if (!a) return NULL;
|
|
|
|
// Default configuration
|
|
a->max_conns = DEFAULT_MAX_CONNS;
|
|
a->read_buf_size = 8192;
|
|
a->write_buf_size = 8192;
|
|
a->arena_size = 4096;
|
|
a->not_found = default_not_found;
|
|
|
|
// Default to single process mode
|
|
a->num_workers = 0;
|
|
a->cpu_affinity = 1;
|
|
a->worker_pids = NULL;
|
|
a->worker_id = -1;
|
|
|
|
// Create router
|
|
a->router = router_new();
|
|
if (!a->router) {
|
|
free(a);
|
|
return NULL;
|
|
}
|
|
|
|
// Store global reference (temporary solution)
|
|
g_app = a;
|
|
|
|
return a;
|
|
}
|
|
|
|
// app_free destroys the application.
|
|
void app_free(app_t *a) {
|
|
if (!a) return;
|
|
|
|
// Free router
|
|
if (a->router) {
|
|
router_free(a->router);
|
|
}
|
|
|
|
// Close listener
|
|
if (a->listen_fd > 0) {
|
|
runtime_del(a->rt, a->listen_fd);
|
|
close(a->listen_fd);
|
|
}
|
|
|
|
// Free pool and runtime
|
|
pool_free(a->pool);
|
|
runtime_free(a->rt);
|
|
free(a);
|
|
}
|
|
|
|
// app_listen sets up the listening socket.
|
|
int app_listen(app_t *a, int port) {
|
|
// Don't create runtime/pool yet if using workers
|
|
// They need to be created after fork() for proper isolation
|
|
if (a->num_workers == 0) {
|
|
// Single process mode - create runtime and pool now
|
|
if (!a->rt) {
|
|
a->rt = runtime_new(DEFAULT_MAX_EVENTS);
|
|
if (!a->rt) return -1;
|
|
}
|
|
|
|
if (!a->pool) {
|
|
a->pool = pool_new(a->rt, a->max_conns);
|
|
if (!a->pool) return -1;
|
|
|
|
// Configure pool
|
|
a->pool->read_buf_size = a->read_buf_size;
|
|
a->pool->write_buf_size = a->write_buf_size;
|
|
a->pool->arena_size = a->arena_size;
|
|
|
|
// Set router as request handler
|
|
a->pool->on_request = route_request;
|
|
}
|
|
}
|
|
|
|
// Create socket
|
|
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (fd < 0) return -1;
|
|
|
|
// Allow reuse
|
|
int opt = 1;
|
|
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
|
|
|
// Enable SO_REUSEPORT for better load distribution across CPU cores
|
|
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
|
|
|
|
// Disable Nagle's algorithm on listener
|
|
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
|
|
|
// Bind
|
|
struct sockaddr_in addr = {
|
|
.sin_family = AF_INET,
|
|
.sin_port = htons(port),
|
|
.sin_addr.s_addr = INADDR_ANY
|
|
};
|
|
|
|
if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
// Listen with larger backlog for better performance
|
|
if (listen(fd, 4096) < 0) {
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
// Make non-blocking
|
|
set_nonblocking(fd);
|
|
|
|
// Add to epoll only if single process mode
|
|
if (a->num_workers == 0 && a->rt) {
|
|
runtime_add(a->rt, fd, EPOLLIN | EPOLLET, accept_handler, a);
|
|
}
|
|
|
|
a->listen_fd = fd;
|
|
a->port = port;
|
|
|
|
if (a->num_workers == 0) {
|
|
printf("Server listening on :%d\n", port);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// run_worker runs a single worker process
|
|
static int run_worker(app_t *a, int worker_id) {
|
|
a->worker_id = worker_id;
|
|
|
|
// Pin to CPU if requested
|
|
if (a->cpu_affinity) {
|
|
cpu_set_t cpuset;
|
|
CPU_ZERO(&cpuset);
|
|
CPU_SET(worker_id % sysconf(_SC_NPROCESSORS_ONLN), &cpuset);
|
|
sched_setaffinity(0, sizeof(cpuset), &cpuset);
|
|
}
|
|
|
|
// Create runtime and pool for this worker
|
|
a->rt = runtime_new(DEFAULT_MAX_EVENTS);
|
|
if (!a->rt) return -1;
|
|
|
|
a->pool = pool_new(a->rt, a->max_conns);
|
|
if (!a->pool) return -1;
|
|
|
|
// Configure pool
|
|
a->pool->read_buf_size = a->read_buf_size;
|
|
a->pool->write_buf_size = a->write_buf_size;
|
|
a->pool->arena_size = a->arena_size;
|
|
a->pool->on_request = route_request;
|
|
|
|
// Add listen socket to this worker's epoll
|
|
runtime_add(a->rt, a->listen_fd, EPOLLIN | EPOLLET, accept_handler, a);
|
|
|
|
printf("Worker %d: Started on CPU %ld, listening on :%d\n", worker_id,
|
|
worker_id % sysconf(_SC_NPROCESSORS_ONLN), a->port);
|
|
|
|
return runtime_run(a->rt);
|
|
}
|
|
|
|
// app_run starts the event loop.
|
|
int app_run(app_t *a) {
|
|
if (a->listen_fd <= 0) {
|
|
fprintf(stderr, "Error: Must call app_listen() before app_run()\n");
|
|
return -1;
|
|
}
|
|
|
|
// Multi-worker mode
|
|
if (a->num_workers > 0) {
|
|
a->worker_pids = calloc(a->num_workers, sizeof(pid_t));
|
|
if (!a->worker_pids) return -1;
|
|
|
|
printf("Starting %d worker processes\n", a->num_workers);
|
|
|
|
for (int i = 0; i < a->num_workers; i++) {
|
|
pid_t pid = fork();
|
|
|
|
if (pid == 0) {
|
|
// Child process
|
|
free(a->worker_pids); // Don't need this in child
|
|
a->worker_pids = NULL;
|
|
return run_worker(a, i);
|
|
} else if (pid > 0) {
|
|
// Parent process
|
|
a->worker_pids[i] = pid;
|
|
} else {
|
|
perror("fork");
|
|
// Kill already started workers
|
|
for (int j = 0; j < i; j++) {
|
|
if (a->worker_pids[j] > 0) {
|
|
kill(a->worker_pids[j], SIGTERM);
|
|
}
|
|
}
|
|
free(a->worker_pids);
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
// Parent process - wait for workers
|
|
printf("All workers started. Running...\n");
|
|
|
|
// Wait for any child to exit
|
|
int status;
|
|
pid_t pid;
|
|
while ((pid = wait(&status)) > 0) {
|
|
printf("Worker process %d exited with status %d\n", pid, WEXITSTATUS(status));
|
|
|
|
// Find and restart the worker
|
|
for (int i = 0; i < a->num_workers; i++) {
|
|
if (a->worker_pids[i] == pid) {
|
|
printf("Restarting worker %d\n", i);
|
|
pid_t new_pid = fork();
|
|
if (new_pid == 0) {
|
|
// New worker process
|
|
free(a->worker_pids);
|
|
a->worker_pids = NULL;
|
|
return run_worker(a, i);
|
|
} else if (new_pid > 0) {
|
|
a->worker_pids[i] = new_pid;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
free(a->worker_pids);
|
|
return 0;
|
|
}
|
|
|
|
// Single process mode
|
|
printf("Running in single-process mode\n");
|
|
a->pool->on_request = route_request;
|
|
return runtime_run(a->rt);
|
|
}
|
|
|
|
// app_route registers a route.
|
|
void app_route(app_t *a, const char *method, const char *path, app_handler_t handler) {
|
|
if (!a || !a->router) return;
|
|
|
|
// For now, we need to bridge between app_handler_t and handler_t
|
|
// The router expects handler_t (conn_t*) but we have app_handler_t (context_t*)
|
|
// We'll need to wrap it somehow - for now just cast (not ideal)
|
|
router_add(a->router, method, path, (handler_t)handler);
|
|
}
|
|
|
|
// Convenience route methods
|
|
void app_get(app_t *a, const char *path, app_handler_t handler) {
|
|
app_route(a, "GET", path, handler);
|
|
}
|
|
|
|
void app_post(app_t *a, const char *path, app_handler_t handler) {
|
|
app_route(a, "POST", path, handler);
|
|
}
|
|
|
|
void app_put(app_t *a, const char *path, app_handler_t handler) {
|
|
app_route(a, "PUT", path, handler);
|
|
}
|
|
|
|
void app_delete(app_t *a, const char *path, app_handler_t handler) {
|
|
app_route(a, "DELETE", path, handler);
|
|
}
|
|
|
|
// Configuration setters
|
|
void app_set_max_conns(app_t *a, int max) {
|
|
a->max_conns = max;
|
|
}
|
|
|
|
void app_set_buffer_sizes(app_t *a, size_t read_size, size_t write_size) {
|
|
a->read_buf_size = read_size;
|
|
a->write_buf_size = write_size;
|
|
}
|
|
|
|
void app_set_arena_size(app_t *a, size_t size) {
|
|
a->arena_size = size;
|
|
}
|
|
|
|
void app_set_not_found(app_t *a, app_handler_t handler) {
|
|
a->not_found = handler;
|
|
}
|
|
|
|
// Worker configuration
|
|
void app_set_workers(app_t *a, int num_workers) {
|
|
a->num_workers = num_workers;
|
|
}
|
|
|
|
void app_set_cpu_affinity(app_t *a, int enabled) {
|
|
a->cpu_affinity = enabled;
|
|
}
|
|
|
|
// Helper response functions
|
|
static void send_text(conn_t *c, int status, const char *text) {
|
|
conn_write_status(c, status);
|
|
conn_write_header(c, "Content-Type", "text/plain");
|
|
conn_write_body(c, text, strlen(text));
|
|
}
|
|
|
|
void send_json(conn_t *c, int status, const char *json) {
|
|
conn_write_status(c, status);
|
|
conn_write_header(c, "Content-Type", "application/json");
|
|
conn_write_body(c, json, strlen(json));
|
|
}
|
|
|
|
void send_html(conn_t *c, int status, const char *html) {
|
|
conn_write_status(c, status);
|
|
conn_write_header(c, "Content-Type", "text/html");
|
|
conn_write_body(c, html, strlen(html));
|
|
}
|
|
|
|
void send_file(conn_t *c, const char *path) {
|
|
// Simple file serving (no caching, no range support)
|
|
FILE *f = fopen(path, "rb");
|
|
if (!f) {
|
|
send_text(c, 404, "File not found");
|
|
return;
|
|
}
|
|
|
|
// Get file size
|
|
fseek(f, 0, SEEK_END);
|
|
size_t size = ftell(f);
|
|
fseek(f, 0, SEEK_SET);
|
|
|
|
// Determine content type from extension
|
|
const char *content_type = "application/octet-stream";
|
|
const char *ext = strrchr(path, '.');
|
|
if (ext) {
|
|
if (strcmp(ext, ".html") == 0) content_type = "text/html";
|
|
else if (strcmp(ext, ".css") == 0) content_type = "text/css";
|
|
else if (strcmp(ext, ".js") == 0) content_type = "application/javascript";
|
|
else if (strcmp(ext, ".json") == 0) content_type = "application/json";
|
|
else if (strcmp(ext, ".png") == 0) content_type = "image/png";
|
|
else if (strcmp(ext, ".jpg") == 0 || strcmp(ext, ".jpeg") == 0) content_type = "image/jpeg";
|
|
else if (strcmp(ext, ".gif") == 0) content_type = "image/gif";
|
|
else if (strcmp(ext, ".svg") == 0) content_type = "image/svg+xml";
|
|
}
|
|
|
|
// Send headers
|
|
conn_write_status(c, 200);
|
|
conn_write_header(c, "Content-Type", content_type);
|
|
|
|
// For small files, read and send directly
|
|
if (size < c->wsize - c->wlen - 100) {
|
|
char *buf = malloc(size);
|
|
if (buf) {
|
|
fread(buf, 1, size, f);
|
|
conn_write_body(c, buf, size);
|
|
free(buf);
|
|
}
|
|
} else {
|
|
// File too large for buffer
|
|
conn_write_status(c, 500);
|
|
conn_end_response(c);
|
|
}
|
|
|
|
fclose(f);
|
|
}
|