diff --git a/app.c b/app.c index ef51c7b..6627536 100644 --- a/app.c +++ b/app.c @@ -1,3 +1,4 @@ +#define _GNU_SOURCE #include "app.h" #include "reactor/pool.h" #include "reactor/epoll.h" @@ -10,6 +11,9 @@ #include #include #include +#include +#include +#include #define DEFAULT_MAX_CONNS 10000 #define DEFAULT_MAX_EVENTS 4096 @@ -93,6 +97,12 @@ app_t* app_new(void) { a->arena_size = 4096; a->not_found = default_not_found; + // Default to single process mode + a->num_workers = 0; + a->cpu_affinity = 1; + a->worker_pids = NULL; + a->worker_id = -1; + // Create router a->router = router_new(); if (!a->router) { @@ -129,23 +139,27 @@ void app_free(app_t *a) { // app_listen sets up the listening socket. int app_listen(app_t *a, int port) { - // Create runtime and pool if not already done - if (!a->rt) { - a->rt = runtime_new(DEFAULT_MAX_EVENTS); - if (!a->rt) return -1; - } + // Don't create runtime/pool yet if using workers + // They need to be created after fork() for proper isolation + if (a->num_workers == 0) { + // Single process mode - create runtime and pool now + if (!a->rt) { + a->rt = runtime_new(DEFAULT_MAX_EVENTS); + if (!a->rt) return -1; + } - if (!a->pool) { - a->pool = pool_new(a->rt, a->max_conns); - if (!a->pool) return -1; + if (!a->pool) { + a->pool = pool_new(a->rt, a->max_conns); + if (!a->pool) return -1; - // Configure pool - a->pool->read_buf_size = a->read_buf_size; - a->pool->write_buf_size = a->write_buf_size; - a->pool->arena_size = a->arena_size; + // Configure pool + a->pool->read_buf_size = a->read_buf_size; + a->pool->write_buf_size = a->write_buf_size; + a->pool->arena_size = a->arena_size; - // Set router as request handler - a->pool->on_request = route_request; + // Set router as request handler + a->pool->on_request = route_request; + } } // Create socket @@ -183,25 +197,126 @@ int app_listen(app_t *a, int port) { // Make non-blocking set_nonblocking(fd); - // Add to epoll - runtime_add(a->rt, fd, EPOLLIN | EPOLLET, accept_handler, a); + // Add to epoll only if single process mode + if (a->num_workers == 0 && a->rt) { + runtime_add(a->rt, fd, EPOLLIN | EPOLLET, accept_handler, a); + } a->listen_fd = fd; a->port = port; - printf("Server listening on :%d\n", port); + if (a->num_workers == 0) { + printf("Server listening on :%d\n", port); + } return 0; } +// run_worker runs a single worker process +static int run_worker(app_t *a, int worker_id) { + a->worker_id = worker_id; + + // Pin to CPU if requested + if (a->cpu_affinity) { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(worker_id % sysconf(_SC_NPROCESSORS_ONLN), &cpuset); + sched_setaffinity(0, sizeof(cpuset), &cpuset); + } + + // Create runtime and pool for this worker + a->rt = runtime_new(DEFAULT_MAX_EVENTS); + if (!a->rt) return -1; + + a->pool = pool_new(a->rt, a->max_conns); + if (!a->pool) return -1; + + // Configure pool + a->pool->read_buf_size = a->read_buf_size; + a->pool->write_buf_size = a->write_buf_size; + a->pool->arena_size = a->arena_size; + a->pool->on_request = route_request; + + // Add listen socket to this worker's epoll + runtime_add(a->rt, a->listen_fd, EPOLLIN | EPOLLET, accept_handler, a); + + printf("Worker %d: Started on CPU %ld, listening on :%d\n", worker_id, + worker_id % sysconf(_SC_NPROCESSORS_ONLN), a->port); + + return runtime_run(a->rt); +} + // app_run starts the event loop. int app_run(app_t *a) { - if (!a->rt || a->listen_fd <= 0) { + if (a->listen_fd <= 0) { + fprintf(stderr, "Error: Must call app_listen() before app_run()\n"); return -1; } - // Override pool callback with router + // Multi-worker mode + if (a->num_workers > 0) { + a->worker_pids = calloc(a->num_workers, sizeof(pid_t)); + if (!a->worker_pids) return -1; + + printf("Starting %d worker processes\n", a->num_workers); + + for (int i = 0; i < a->num_workers; i++) { + pid_t pid = fork(); + + if (pid == 0) { + // Child process + free(a->worker_pids); // Don't need this in child + a->worker_pids = NULL; + return run_worker(a, i); + } else if (pid > 0) { + // Parent process + a->worker_pids[i] = pid; + } else { + perror("fork"); + // Kill already started workers + for (int j = 0; j < i; j++) { + if (a->worker_pids[j] > 0) { + kill(a->worker_pids[j], SIGTERM); + } + } + free(a->worker_pids); + return -1; + } + } + + // Parent process - wait for workers + printf("All workers started. Running...\n"); + + // Wait for any child to exit + int status; + pid_t pid; + while ((pid = wait(&status)) > 0) { + printf("Worker process %d exited with status %d\n", pid, WEXITSTATUS(status)); + + // Find and restart the worker + for (int i = 0; i < a->num_workers; i++) { + if (a->worker_pids[i] == pid) { + printf("Restarting worker %d\n", i); + pid_t new_pid = fork(); + if (new_pid == 0) { + // New worker process + free(a->worker_pids); + a->worker_pids = NULL; + return run_worker(a, i); + } else if (new_pid > 0) { + a->worker_pids[i] = new_pid; + } + break; + } + } + } + + free(a->worker_pids); + return 0; + } + + // Single process mode + printf("Running in single-process mode\n"); a->pool->on_request = route_request; - return runtime_run(a->rt); } @@ -250,6 +365,15 @@ void app_set_not_found(app_t *a, app_handler_t handler) { a->not_found = handler; } +// Worker configuration +void app_set_workers(app_t *a, int num_workers) { + a->num_workers = num_workers; +} + +void app_set_cpu_affinity(app_t *a, int enabled) { + a->cpu_affinity = enabled; +} + // Helper response functions static void send_text(conn_t *c, int status, const char *text) { conn_write_status(c, status); diff --git a/app.h b/app.h index 2f93eb8..03f35b3 100644 --- a/app.h +++ b/app.h @@ -5,6 +5,7 @@ #include "router.h" #include "reactor/pool.h" #include "reactor/epoll.h" +#include typedef struct app app_t; typedef void (*app_handler_t)(context_t *ctx); @@ -22,6 +23,14 @@ struct app { size_t read_buf_size; size_t write_buf_size; size_t arena_size; + + // Worker configuration + int num_workers; // Number of worker processes (0 = single process) + int cpu_affinity; // Pin workers to CPU cores + + // Worker management + pid_t *worker_pids; // Array of worker process IDs + int worker_id; // Current worker ID (-1 for parent) // Default handlers app_handler_t not_found; @@ -48,4 +57,8 @@ void app_set_arena_size(app_t *a, size_t size); void app_set_not_found(app_t *a, app_handler_t handler); void app_set_error(app_t *a, app_handler_t handler); +// Worker configuration +void app_set_workers(app_t *a, int num_workers); +void app_set_cpu_affinity(app_t *a, int enabled); + #endif // APP_H