package dev.coph.nextusweb.server; import dev.coph.nextusweb.server.cores.CorsHandler; import dev.coph.nextusweb.server.ratelimit.RateLimitGate; import dev.coph.nextusweb.server.ratelimit.RateLimiter; import dev.coph.nextusweb.server.router.Request; import dev.coph.nextusweb.server.router.Response; import dev.coph.nextusweb.server.router.Router; import dev.coph.nextusweb.server.router.exception.BadRequestException; import dev.coph.nextusweb.server.websocket.WebSocketConfig; import dev.coph.nextusweb.server.websocket.WebSocketFrameHandlerFactory; import dev.coph.nextusweb.server.websocket.WebSocketRouter; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.timeout.IdleStateHandler; import java.net.InetSocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * The core inbound channel handler that processes every aggregated HTTP request. * *

For each request it, in order: detects and performs WebSocket upgrades (when a WebSocket * router is configured), answers CORS preflight requests, enforces rate limits, resolves the * route via the {@link Router}, runs middlewares and the matched handler, and finally writes the * response with CORS and rate-limit headers applied.

* *

Blocking handler logic runs on a virtual-thread executor rather than on the Netty event * loop, so handlers may perform blocking work without stalling I/O. WebSocket upgrades, by * contrast, mutate the pipeline and are handled inline on the event loop.

*/ public final class HttpRequestHandler extends SimpleChannelInboundHandler { /** Executor running one virtual thread per task, used to offload blocking handler work. */ private static final Executor VT_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor(); /** Router resolving requests to handlers. */ private final Router router; /** CORS handler, or {@code null} if CORS is disabled. */ private final CorsHandler cors; /** Rate-limit gate, or {@code null} if rate limiting is disabled. */ private final RateLimitGate rateLimit; /** WebSocket router, or {@code null} if WebSocket support is disabled. */ private final WebSocketRouter wsRouter; /** WebSocket configuration; only consulted when {@link #wsRouter} is non-null. */ private final WebSocketConfig wsConfig; /** * Creates a handler without WebSocket support. * * @param router the router resolving requests * @param cors the CORS handler, or {@code null} to disable CORS * @param rateLimit the rate-limit gate, or {@code null} to disable rate limiting */ public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit) { this(router, cors, rateLimit, null, null); } /** * Creates a handler, optionally with WebSocket support. * * @param router the router resolving requests * @param cors the CORS handler, or {@code null} to disable CORS * @param rateLimit the rate-limit gate, or {@code null} to disable rate limiting * @param wsRouter the WebSocket router, or {@code null} to disable WebSocket support * @param wsConfig the WebSocket configuration, used only when {@code wsRouter} is non-null */ public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit, WebSocketRouter wsRouter, WebSocketConfig wsConfig) { this.router = router; this.cors = cors; this.rateLimit = rateLimit; this.wsRouter = wsRouter; this.wsConfig = wsConfig; } /** * Entry point invoked by Netty for each fully aggregated request. WebSocket upgrade requests * are handled inline; all other requests are retained and dispatched to a virtual thread for * processing, with the request released once handling completes. * * @param ctx the channel context * @param req the aggregated HTTP request */ @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) { if (wsRouter != null && isWebSocketUpgrade(req)) { if (handleWebSocketUpgrade(ctx, req)) return; } req.retain(); VT_EXECUTOR.execute(() -> { try { handle(ctx, req); } finally { req.release(); } }); } /** * Determines whether a request is a WebSocket upgrade handshake, i.e. a {@code GET} carrying * {@code Upgrade: websocket} and a {@code Connection} header that includes the * {@code upgrade} token. * * @param req the request to inspect * @return {@code true} if the request is a WebSocket upgrade */ private static boolean isWebSocketUpgrade(FullHttpRequest req) { if (req.method() != HttpMethod.GET) return false; String upgrade = req.headers().get(HttpHeaderNames.UPGRADE); if (upgrade == null || !"websocket".equalsIgnoreCase(upgrade)) return false; String connection = req.headers().get(HttpHeaderNames.CONNECTION); if (connection == null) return false; for (String token : connection.split(",")) { if ("upgrade".equalsIgnoreCase(token.trim())) return true; } return false; } /** * Attempts to upgrade the connection to WebSocket for the request's path. * *

Resolves the path against the WebSocket router; if no handler matches the upgrade is * declined. Otherwise the origin is validated, the WebSocket protocol/compression/idle * handlers and the application frame handler are inserted into the pipeline, and the request * is re-fired so Netty performs the handshake.

* * @param ctx the channel context * @param req the upgrade request * @return {@code true} if the request was consumed (handshake started or rejected), * {@code false} if no WebSocket route matched and normal HTTP handling should * continue */ private boolean handleWebSocketUpgrade(ChannelHandlerContext ctx, FullHttpRequest req) { String path = new QueryStringDecoder(req.uri()).path(); WebSocketRouter.Resolution resolution = wsRouter.resolve(path); if (resolution == null) return false; String origin = req.headers().get(HttpHeaderNames.ORIGIN); if (!wsConfig.isOriginAllowed(origin)) { FullHttpResponse forbidden = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN); forbidden.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0); ctx.writeAndFlush(forbidden).addListener(ChannelFutureListener.CLOSE); return true; } WebSocketServerProtocolConfig protoCfg = WebSocketServerProtocolConfig.newBuilder() .websocketPath(path) .checkStartsWith(false) .subprotocols(wsConfig.subprotocolsCsv()) .maxFramePayloadLength(wsConfig.maxFramePayloadLength()) .allowExtensions(wsConfig.compression()) .build(); ChannelPipeline pipeline = ctx.pipeline(); String myName = ctx.name(); if (wsConfig.idleTimeout() != null) { long secs = Math.max(1, wsConfig.idleTimeout().toSeconds()); pipeline.addBefore(myName, "ws-idle", new IdleStateHandler(0, 0, secs, TimeUnit.SECONDS)); } if (wsConfig.compression()) { pipeline.addBefore(myName, "ws-deflate", new WebSocketServerCompressionHandler()); } pipeline.addBefore(myName, "ws-proto", new WebSocketServerProtocolHandler(protoCfg)); pipeline.addBefore(myName, "ws-frames", WebSocketFrameHandlerFactory.create(resolution.handler(), path, resolution.pathParams())); ChannelHandlerContext anchor = pipeline.context(HttpObjectAggregator.class); if (anchor == null) anchor = pipeline.firstContext(); anchor.fireChannelRead(req.retain()); return true; } /** * Processes a normal (non-WebSocket) HTTP request: applies CORS preflight handling and rate * limiting, resolves the route, runs middlewares and the handler, and sends the response. * *

Exceptions from the handler are mapped to responses: a {@link BadRequestException} * becomes a {@code 400}, any other exception a {@code 500}. Routing misses become * {@code 404}, and method mismatches a {@code 405} with an {@code Allow} header. CORS and * rate-limit headers are applied to the final response in all cases.

* * @param ctx the channel context * @param raw the aggregated request being handled */ private void handle(ChannelHandlerContext ctx, FullHttpRequest raw) { String origin = raw.headers().get("Origin"); if (cors != null && cors.isPreflight(raw.method(), raw.headers())) { send(ctx, cors.handlePreflight(origin, raw.headers())); return; } String path = new QueryStringDecoder(raw.uri()).path(); RateLimiter.Result rlResult = null; if (rateLimit != null) { String remote = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); rlResult = rateLimit.check(raw, path, remote); if (rlResult != null && !rlResult.allowed()) { Response res = new Response().status(429).json("{\"error\":\"Too Many Requests\"}"); RateLimitGate.applyHeaders(rlResult, res); if (cors != null) cors.applyHeaders(origin, res); send(ctx, res); return; } } Router.Resolution resolution = router.resolve(raw.method(), path); Response res = new Response(); switch (resolution) { case Router.Resolution.Match m -> { Request request = new Request(raw, m.pathParams()); try { for (var mw : router.middlewares()) mw.accept(request, res); m.handler().handle(request, res); } catch (BadRequestException e) { res.status(400).json("{\"error\":\"" + e.getMessage() + "\"}"); } catch (Exception e) { res.status(500).text("Internal Server Error: " + e.getMessage()); } } case Router.Resolution.MethodNotAllowed mna -> { String allow = mna.allowedMethods().stream() .map(HttpMethod::name) .sorted() .collect(Collectors.joining(", ")); res.status(405) .header(HttpHeaderNames.ALLOW.toString(), allow) .json("{\"error\":\"Method Not Allowed\",\"allowed\":\"" + allow + "\"}"); } case Router.Resolution.NotFound nf -> res.status(404).json("{\"error\":\"Not Found\"}"); } RateLimitGate.applyHeaders(rlResult, res); if (cors != null) cors.applyHeaders(origin, res); send(ctx, res); } /** * Converts the framework {@link Response} into a Netty {@link FullHttpResponse}, sets the * {@code Content-Length}, writes it and closes the connection afterwards. * * @param ctx the channel context * @param res the response to send */ private void send(ChannelHandlerContext ctx, Response res) { var nettyRes = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(res.status()), Unpooled.wrappedBuffer(res.body()) ); nettyRes.headers().add(res.headers()); nettyRes.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, res.body().length); ctx.writeAndFlush(nettyRes).addListener(ChannelFutureListener.CLOSE); } /** * Closes the channel on any unhandled pipeline exception. * * @param ctx the channel context * @param cause the exception that propagated up the pipeline */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }