#define _GNU_SOURCE #include "app.h" #include "reactor/pool.h" #include "reactor/epoll.h" #include #include #include #include #include #include #include #include #include #include #include #define DEFAULT_MAX_CONNS 10000 #define DEFAULT_MAX_EVENTS 4096 // Global app instance for simplified macro API app_t *_global_app = NULL; // Global route list for inline route definitions (type defined in app_macros.h) void *_route_list = NULL; // Default 404 handler static void default_not_found(context_t *ctx) { // send_text(ctx, 404, "404 Not Found"); } // Accept handler for new connections static void accept_handler(runtime_t *rt, int fd, uint32_t events, void *data) { app_t *a = (app_t*)data; // Accept multiple connections in a loop for edge-triggered mode // Limit to prevent starvation of other events int accepted = 0; const int max_accept = 64; while (accepted < max_accept) { int ret = conn_accept(a->pool, fd); if (ret < 0) break; accepted++; } } // Global app pointer (temporary solution) static app_t *g_app = NULL; // Wrapper to convert between handler types static void handler_wrapper(conn_t *c) { app_handler_t app_handler = (app_handler_t)c->user_data; if (app_handler) { context_t *ctx = context_new(c); if (ctx) { app_handler(ctx); context_free(ctx); } } } // Request router static void route_request(conn_t *c) { app_t *a = g_app; // Use global for now if (!a || !a->router) return; // Use the router to find handler char **params = NULL; char **names = NULL; int count = 0; handler_t handler = router_lookup(a->router, c->method, c->path, ¶ms, &names, &count); if (handler) { // The handler is actually an app_handler_t cast to handler_t // Create context and call it context_t *ctx = context_new(c); if (ctx) { // Set route parameters on the request if (count > 0 && params && names) { request_set_params(ctx->request, names, params, count); } ((app_handler_t)handler)(ctx); context_free(ctx); } } else { // Create a context and call not_found context_t *ctx = context_new(c); if (ctx) { a->not_found(ctx); context_free(ctx); } } } // app_new creates a new web application. app_t* app_new(void) { app_t *a = calloc(1, sizeof(app_t)); if (!a) return NULL; // Default configuration a->max_conns = DEFAULT_MAX_CONNS; a->read_buf_size = 8192; a->write_buf_size = 8192; a->arena_size = 4096; a->not_found = default_not_found; // Default to single process mode a->num_workers = 0; a->cpu_affinity = 1; a->worker_pids = NULL; a->worker_id = -1; // Create router a->router = router_new(); if (!a->router) { free(a); return NULL; } // Store global reference (temporary solution) g_app = a; return a; } // app_free destroys the application. void app_free(app_t *a) { if (!a) return; // Free router if (a->router) { router_free(a->router); } // Close listener if (a->listen_fd > 0) { runtime_del(a->rt, a->listen_fd); close(a->listen_fd); } // Free pool and runtime pool_free(a->pool); runtime_free(a->rt); free(a); } // app_listen sets up the listening socket. int app_listen(app_t *a, int port) { // Don't create runtime/pool yet if using workers // They need to be created after fork() for proper isolation if (a->num_workers == 0) { // Single process mode - create runtime and pool now if (!a->rt) { a->rt = runtime_new(DEFAULT_MAX_EVENTS); if (!a->rt) return -1; } if (!a->pool) { a->pool = pool_new(a->rt, a->max_conns); if (!a->pool) return -1; // Configure pool a->pool->read_buf_size = a->read_buf_size; a->pool->write_buf_size = a->write_buf_size; a->pool->arena_size = a->arena_size; // Set router as request handler a->pool->on_request = route_request; } } // Create socket int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) return -1; // Allow reuse int opt = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); // Enable SO_REUSEPORT for better load distribution across CPU cores setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); // Disable Nagle's algorithm on listener setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); // Bind struct sockaddr_in addr = { .sin_family = AF_INET, .sin_port = htons(port), .sin_addr.s_addr = INADDR_ANY }; if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { close(fd); return -1; } // Listen with larger backlog for better performance if (listen(fd, 4096) < 0) { close(fd); return -1; } // Make non-blocking set_nonblocking(fd); // Add to epoll only if single process mode if (a->num_workers == 0 && a->rt) { runtime_add(a->rt, fd, EPOLLIN | EPOLLET, accept_handler, a); } a->listen_fd = fd; a->port = port; if (a->num_workers == 0) { printf("Server listening on :%d\n", port); } return 0; } // run_worker runs a single worker process static int run_worker(app_t *a, int worker_id) { a->worker_id = worker_id; // Pin to CPU if requested if (a->cpu_affinity) { cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(worker_id % sysconf(_SC_NPROCESSORS_ONLN), &cpuset); sched_setaffinity(0, sizeof(cpuset), &cpuset); } // Create runtime and pool for this worker a->rt = runtime_new(DEFAULT_MAX_EVENTS); if (!a->rt) return -1; a->pool = pool_new(a->rt, a->max_conns); if (!a->pool) return -1; // Configure pool a->pool->read_buf_size = a->read_buf_size; a->pool->write_buf_size = a->write_buf_size; a->pool->arena_size = a->arena_size; a->pool->on_request = route_request; // Add listen socket to this worker's epoll runtime_add(a->rt, a->listen_fd, EPOLLIN | EPOLLET, accept_handler, a); printf("Worker %d: Started on CPU %ld, listening on :%d\n", worker_id, worker_id % sysconf(_SC_NPROCESSORS_ONLN), a->port); return runtime_run(a->rt); } // app_run starts the event loop. int app_run(app_t *a) { if (a->listen_fd <= 0) { fprintf(stderr, "Error: Must call app_listen() before app_run()\n"); return -1; } // Multi-worker mode if (a->num_workers > 0) { a->worker_pids = calloc(a->num_workers, sizeof(pid_t)); if (!a->worker_pids) return -1; printf("Starting %d worker processes\n", a->num_workers); for (int i = 0; i < a->num_workers; i++) { pid_t pid = fork(); if (pid == 0) { // Child process free(a->worker_pids); // Don't need this in child a->worker_pids = NULL; return run_worker(a, i); } else if (pid > 0) { // Parent process a->worker_pids[i] = pid; } else { perror("fork"); // Kill already started workers for (int j = 0; j < i; j++) { if (a->worker_pids[j] > 0) { kill(a->worker_pids[j], SIGTERM); } } free(a->worker_pids); return -1; } } // Parent process - wait for workers printf("All workers started. Running...\n"); // Wait for any child to exit int status; pid_t pid; while ((pid = wait(&status)) > 0) { printf("Worker process %d exited with status %d\n", pid, WEXITSTATUS(status)); // Find and restart the worker for (int i = 0; i < a->num_workers; i++) { if (a->worker_pids[i] == pid) { printf("Restarting worker %d\n", i); pid_t new_pid = fork(); if (new_pid == 0) { // New worker process free(a->worker_pids); a->worker_pids = NULL; return run_worker(a, i); } else if (new_pid > 0) { a->worker_pids[i] = new_pid; } break; } } } free(a->worker_pids); return 0; } // Single process mode printf("Running in single-process mode\n"); a->pool->on_request = route_request; return runtime_run(a->rt); } // app_route registers a route. void app_route(app_t *a, const char *method, const char *path, app_handler_t handler) { if (!a || !a->router) return; // For now, we need to bridge between app_handler_t and handler_t // The router expects handler_t (conn_t*) but we have app_handler_t (context_t*) // We'll need to wrap it somehow - for now just cast (not ideal) router_add(a->router, method, path, (handler_t)handler); } // Convenience route methods void app_get(app_t *a, const char *path, app_handler_t handler) { app_route(a, "GET", path, handler); } void app_post(app_t *a, const char *path, app_handler_t handler) { app_route(a, "POST", path, handler); } void app_put(app_t *a, const char *path, app_handler_t handler) { app_route(a, "PUT", path, handler); } void app_delete(app_t *a, const char *path, app_handler_t handler) { app_route(a, "DELETE", path, handler); } // Configuration setters void app_set_max_conns(app_t *a, int max) { a->max_conns = max; } void app_set_buffer_sizes(app_t *a, size_t read_size, size_t write_size) { a->read_buf_size = read_size; a->write_buf_size = write_size; } void app_set_arena_size(app_t *a, size_t size) { a->arena_size = size; } void app_set_not_found(app_t *a, app_handler_t handler) { a->not_found = handler; } // Worker configuration void app_set_workers(app_t *a, int num_workers) { a->num_workers = num_workers; } void app_set_cpu_affinity(app_t *a, int enabled) { a->cpu_affinity = enabled; } // Helper response functions static void send_text(conn_t *c, int status, const char *text) { conn_write_status(c, status); conn_write_header(c, "Content-Type", "text/plain"); conn_write_body(c, text, strlen(text)); } void send_json(conn_t *c, int status, const char *json) { conn_write_status(c, status); conn_write_header(c, "Content-Type", "application/json"); conn_write_body(c, json, strlen(json)); } void send_html(conn_t *c, int status, const char *html) { conn_write_status(c, status); conn_write_header(c, "Content-Type", "text/html"); conn_write_body(c, html, strlen(html)); } void send_file(conn_t *c, const char *path) { // Simple file serving (no caching, no range support) FILE *f = fopen(path, "rb"); if (!f) { send_text(c, 404, "File not found"); return; } // Get file size fseek(f, 0, SEEK_END); size_t size = ftell(f); fseek(f, 0, SEEK_SET); // Determine content type from extension const char *content_type = "application/octet-stream"; const char *ext = strrchr(path, '.'); if (ext) { if (strcmp(ext, ".html") == 0) content_type = "text/html"; else if (strcmp(ext, ".css") == 0) content_type = "text/css"; else if (strcmp(ext, ".js") == 0) content_type = "application/javascript"; else if (strcmp(ext, ".json") == 0) content_type = "application/json"; else if (strcmp(ext, ".png") == 0) content_type = "image/png"; else if (strcmp(ext, ".jpg") == 0 || strcmp(ext, ".jpeg") == 0) content_type = "image/jpeg"; else if (strcmp(ext, ".gif") == 0) content_type = "image/gif"; else if (strcmp(ext, ".svg") == 0) content_type = "image/svg+xml"; } // Send headers conn_write_status(c, 200); conn_write_header(c, "Content-Type", content_type); // For small files, read and send directly if (size < c->wsize - c->wlen - 100) { char *buf = malloc(size); if (buf) { fread(buf, 1, size, f); conn_write_body(c, buf, size); free(buf); } } else { // File too large for buffer conn_write_status(c, 500); conn_end_response(c); } fclose(f); }