Reformat code comments for consistency and clarity across all classes
CI - Test, Publish and Release / run-tests (push) Failing after 15s
CI - Test, Publish and Release / create-release (push) Has been skipped
CI - Test, Publish and Release / check-and-publish (push) Has been skipped

This commit is contained in:
2026-06-15 07:27:07 +02:00
parent 6de7e26f33
commit 893bb0b7bd
32 changed files with 849 additions and 544 deletions
@@ -55,30 +55,52 @@ import java.util.stream.Collectors;
*/
public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
/** Logger used for server-side error diagnostics (never leaked to clients). */
/**
* Logger used for server-side error diagnostics (never leaked to clients).
*/
private static final Logger LOG = System.getLogger(HttpRequestHandler.class.getName());
/** Executor running one virtual thread per task, used to offload blocking handler work. */
/**
* Executor running one virtual thread per task, used to offload blocking handler work.
*/
private static final Executor VT_EXECUTOR =
Executors.newVirtualThreadPerTaskExecutor();
/** Router resolving requests to handlers. */
/**
* Router resolving requests to handlers.
*/
private final Router router;
/** CORS handler, or {@code null} if CORS is disabled. */
/**
* CORS handler, or {@code null} if CORS is disabled.
*/
private final CorsHandler cors;
/** Rate-limit gate, or {@code null} if rate limiting is disabled. */
/**
* Rate-limit gate, or {@code null} if rate limiting is disabled.
*/
private final RateLimitGate rateLimit;
/** Authentication gate, or {@code null} if the auth layer is disabled. */
/**
* Authentication gate, or {@code null} if the auth layer is disabled.
*/
private final AuthGate authGate;
/** Trusted-proxy policy used to resolve the client IP; never {@code null}. */
/**
* Trusted-proxy policy used to resolve the client IP; never {@code null}.
*/
private final TrustedProxies trustedProxies;
/** WebSocket router, or {@code null} if WebSocket support is disabled. */
/**
* WebSocket router, or {@code null} if WebSocket support is disabled.
*/
private final WebSocketRouter wsRouter;
/** WebSocket configuration; only consulted when {@link #wsRouter} is non-null. */
/**
* WebSocket configuration; only consulted when {@link #wsRouter} is non-null.
*/
private final WebSocketConfig wsConfig;
/** Security-header policy applied to every response, or {@code null} if disabled. */
/**
* Security-header policy applied to every response, or {@code null} if disabled.
*/
private final SecurityHeaders securityHeaders;
/** Whether this server's connections are secured by TLS (gates HSTS emission). */
/**
* Whether this server's connections are secured by TLS (gates HSTS emission).
*/
private final boolean secure;
/**
@@ -95,23 +117,6 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
this(router, cors, rateLimit, authGate, trustedProxies, null, null, null, false);
}
/**
* Creates a handler, optionally with WebSocket support.
*
* @param router the router resolving requests
* @param cors the CORS handler, or {@code null} to disable CORS
* @param rateLimit the rate-limit gate, or {@code null} to disable rate limiting
* @param authGate the auth gate, or {@code null} to disable the auth layer
* @param trustedProxies the trusted-proxy policy, or {@code null} for {@link TrustedProxies#none()}
* @param wsRouter the WebSocket router, or {@code null} to disable WebSocket support
* @param wsConfig the WebSocket configuration, used only when {@code wsRouter} is non-null
*/
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit,
AuthGate authGate, TrustedProxies trustedProxies,
WebSocketRouter wsRouter, WebSocketConfig wsConfig) {
this(router, cors, rateLimit, authGate, trustedProxies, wsRouter, wsConfig, null, false);
}
/**
* Creates a handler with WebSocket support and a security-header policy.
*
@@ -140,6 +145,23 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
this.secure = secure;
}
/**
* Creates a handler, optionally with WebSocket support.
*
* @param router the router resolving requests
* @param cors the CORS handler, or {@code null} to disable CORS
* @param rateLimit the rate-limit gate, or {@code null} to disable rate limiting
* @param authGate the auth gate, or {@code null} to disable the auth layer
* @param trustedProxies the trusted-proxy policy, or {@code null} for {@link TrustedProxies#none()}
* @param wsRouter the WebSocket router, or {@code null} to disable WebSocket support
* @param wsConfig the WebSocket configuration, used only when {@code wsRouter} is non-null
*/
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit,
AuthGate authGate, TrustedProxies trustedProxies,
WebSocketRouter wsRouter, WebSocketConfig wsConfig) {
this(router, cors, rateLimit, authGate, trustedProxies, wsRouter, wsConfig, null, false);
}
/**
* Entry point invoked by Netty for each fully aggregated request. WebSocket upgrade requests
* are handled inline; all other requests disable the connection's auto-read (one in-flight
@@ -178,7 +200,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
private static boolean isWebSocketUpgrade(FullHttpRequest req) {
if (req.method() != HttpMethod.GET) return false;
String upgrade = req.headers().get(HttpHeaderNames.UPGRADE);
if (upgrade == null || !"websocket".equalsIgnoreCase(upgrade)) return false;
if (!"websocket".equalsIgnoreCase(upgrade)) return false;
String connection = req.headers().get(HttpHeaderNames.CONNECTION);
if (connection == null) return false;
for (String token : connection.split(",")) {
@@ -199,7 +221,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
* @param ctx the channel context
* @param req the upgrade request
* @return {@code true} if the request was consumed (handshake started or rejected),
* {@code false} if no WebSocket route matched and normal HTTP handling should continue
* {@code false} if no WebSocket route matched and normal HTTP handling should continue
*/
private boolean handleWebSocketUpgrade(ChannelHandlerContext ctx, FullHttpRequest req) {
String path = new QueryStringDecoder(req.uri()).path();
@@ -359,6 +381,19 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
}
}
/**
* Writes an empty-bodied response with the given status and closes the connection, used for
* rejected WebSocket upgrades.
*
* @param ctx the channel context
* @param status the HTTP status to send
*/
private static void sendStatusAndClose(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
res.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
/**
* Resolves the effective client IP for a request, honouring the configured trusted proxies.
*
@@ -418,19 +453,6 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
}
}
/**
* Writes an empty-bodied response with the given status and closes the connection, used for
* rejected WebSocket upgrades.
*
* @param ctx the channel context
* @param status the HTTP status to send
*/
private static void sendStatusAndClose(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse res = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
res.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, 0);
ctx.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
}
/**
* Closes the channel on any unhandled pipeline exception (including read timeouts).
*
@@ -44,34 +44,62 @@ import java.util.concurrent.TimeUnit;
*/
public final class HttpServer {
/** Default cap on aggregated HTTP request bodies: 1&nbsp;MiB. */
/**
* Default cap on aggregated HTTP request bodies: 1&nbsp;MiB.
*/
private static final int DEFAULT_MAX_HTTP_CONTENT_LENGTH = 1_048_576;
/** Default per-connection read timeout that reaps slow/idle clients. */
/**
* Default per-connection read timeout that reaps slow/idle clients.
*/
private static final Duration DEFAULT_HTTP_READ_TIMEOUT = Duration.ofSeconds(30);
/** TCP port the server binds to. */
/**
* TCP port the server binds to.
*/
private final int port;
/** Router resolving requests to handlers. */
/**
* Router resolving requests to handlers.
*/
private final Router router;
/** Optional TLS configuration; {@code null} serves plain HTTP. */
/**
* Optional TLS configuration; {@code null} serves plain HTTP.
*/
private TlsConfig tls;
/** Optional CORS handler; {@code null} disables CORS handling. */
/**
* Optional CORS handler; {@code null} disables CORS handling.
*/
private CorsHandler cors;
/** Optional rate-limit gate; {@code null} disables rate limiting. */
/**
* Optional rate-limit gate; {@code null} disables rate limiting.
*/
private RateLimitGate gate;
/** Optional authentication gate; {@code null} disables the auth layer. */
/**
* Optional authentication gate; {@code null} disables the auth layer.
*/
private AuthGate authGate;
/** Optional security-header policy; {@code null} adds no security headers. */
/**
* Optional security-header policy; {@code null} adds no security headers.
*/
private SecurityHeaders securityHeaders;
/** Optional WebSocket router; {@code null} disables WebSocket support. */
/**
* Optional WebSocket router; {@code null} disables WebSocket support.
*/
private WebSocketRouter wsRouter;
/** WebSocket configuration; only used when {@link #wsRouter} is set. */
/**
* WebSocket configuration; only used when {@link #wsRouter} is set.
*/
private WebSocketConfig wsConfig;
/** Trusted-proxy policy for resolving the client IP; never {@code null}. */
/**
* Trusted-proxy policy for resolving the client IP; never {@code null}.
*/
private TrustedProxies trustedProxies = TrustedProxies.none();
/** Maximum aggregated HTTP request body size in bytes. */
/**
* Maximum aggregated HTTP request body size in bytes.
*/
private int maxHttpContentLength = DEFAULT_MAX_HTTP_CONTENT_LENGTH;
/** Per-connection HTTP read timeout; {@code null} or non-positive disables it. */
/**
* Per-connection HTTP read timeout; {@code null} or non-positive disables it.
*/
private Duration httpReadTimeout = DEFAULT_HTTP_READ_TIMEOUT;
private HttpServer(int port, Router router) {
@@ -4,6 +4,7 @@ import dev.coph.nextusweb.server.router.Request;
import dev.coph.nextusweb.server.router.Response;
import dev.coph.nextusweb.server.router.Router;
import io.netty.handler.codec.http.HttpMethod;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
@@ -122,7 +123,7 @@ public final class AnnotationScanner {
*
* @param m the method to inspect
* @return a {@link RouteInfo} describing the route, or {@code null} if the method carries
* no recognised route annotation
* no recognised route annotation
*/
private static RouteInfo extractRoute(Method m) {
Route r = m.getAnnotation(Route.class);
@@ -1,10 +1,6 @@
package dev.coph.nextusweb.server.auth;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* Immutable mapping from request paths to the authentication requirement that applies to them,
@@ -25,23 +21,22 @@ import java.util.Objects;
*/
public final class AuthConfig {
/** Whether a matched rule rejects unauthenticated requests or merely annotates them. */
public enum Mode {
/** Authentication is mandatory; failure yields {@code 401 Unauthorized}. */
REQUIRED,
/** Authentication is best-effort; the principal is attached if present, never rejected. */
OPTIONAL
}
/** Rule applied to every path with no more specific match, or {@code null} if none. */
/**
* Rule applied to every path with no more specific match, or {@code null} if none.
*/
private final Rule globalRule;
/** Rules matched by exact path equality. */
/**
* Rules matched by exact path equality.
*/
private final Map<String, Rule> exactPathRules;
/** Prefix rules, pre-sorted longest-prefix-first. */
/**
* Prefix rules, pre-sorted longest-prefix-first.
*/
private final List<PrefixRule> prefixRules;
/** Optional {@code WWW-Authenticate} challenge sent with {@code 401} responses. */
/**
* Optional {@code WWW-Authenticate} challenge sent with {@code 401} responses.
*/
private final String challenge;
private AuthConfig(Builder b) {
this.globalRule = b.globalRule;
this.exactPathRules = Map.copyOf(b.exactPathRules);
@@ -87,6 +82,20 @@ public final class AuthConfig {
return challenge;
}
/**
* Whether a matched rule rejects unauthenticated requests or merely annotates them.
*/
public enum Mode {
/**
* Authentication is mandatory; failure yields {@code 401 Unauthorized}.
*/
REQUIRED,
/**
* Authentication is best-effort; the principal is attached if present, never rejected.
*/
OPTIONAL
}
/**
* An authentication rule: which authenticator to use and whether it is mandatory.
*
@@ -96,7 +105,9 @@ public final class AuthConfig {
public record Rule(Authenticator authenticator, Mode mode) {
}
/** Internal pairing of a path prefix with its rule. */
/**
* Internal pairing of a path prefix with its rule.
*/
private record PrefixRule(String prefix, Rule rule) {
}
@@ -14,7 +14,9 @@ import dev.coph.nextusweb.server.router.Response;
*/
public final class AuthGate {
/** The policy this gate enforces. */
/**
* The policy this gate enforces.
*/
private final AuthConfig config;
/**
@@ -23,16 +23,6 @@ import java.util.function.Function;
@FunctionalInterface
public interface Authenticator {
/**
* Attempts to authenticate a request.
*
* @param request the incoming request
* @return the authenticated principal, or {@code null} if the request is unauthenticated
* @throws Exception if an unexpected error occurs while validating the credential (treated as
* an internal error, not an authentication failure)
*/
Principal authenticate(Request request) throws Exception;
/**
* Authenticates via an API key carried in a request header (for example {@code X-API-Key}).
* The validator maps a presented key to a {@link Principal}, or to {@code null} if the key is
@@ -129,6 +119,16 @@ public interface Authenticator {
};
}
/**
* Attempts to authenticate a request.
*
* @param request the incoming request
* @return the authenticated principal, or {@code null} if the request is unauthenticated
* @throws Exception if an unexpected error occurs while validating the credential (treated as
* an internal error, not an authentication failure)
*/
Principal authenticate(Request request) throws Exception;
/**
* Compares two secrets (API keys, tokens, passwords, ...) in length-constant time, so the
* time taken does not reveal how many leading characters matched. Use this inside a validator
@@ -19,44 +19,6 @@ import java.util.Set;
*/
public interface Principal {
/**
* Returns the stable, unique identifier of this principal (for example a user id, an account
* name or an API-key id). Used wherever the identity must be reduced to a single string, such
* as principal-based rate limiting.
*
* @return the principal identifier; never {@code null}
*/
String id();
/**
* Returns the roles granted to this principal, for coarse-grained authorization checks.
*
* @return the (possibly empty) set of roles; never {@code null}
*/
default Set<String> roles() {
return Set.of();
}
/**
* Indicates whether this principal holds the given role.
*
* @param role the role to test for
* @return {@code true} if {@link #roles()} contains {@code role}
*/
default boolean hasRole(String role) {
return roles().contains(role);
}
/**
* Returns arbitrary additional attributes describing this principal (for example token
* scopes, an email address or tenant information).
*
* @return the (possibly empty) claim map; never {@code null}
*/
default Map<String, Object> claims() {
return Map.of();
}
/**
* Creates a simple immutable principal with no roles.
*
@@ -93,4 +55,42 @@ public interface Principal {
}
};
}
/**
* Returns the stable, unique identifier of this principal (for example a user id, an account
* name or an API-key id). Used wherever the identity must be reduced to a single string, such
* as principal-based rate limiting.
*
* @return the principal identifier; never {@code null}
*/
String id();
/**
* Indicates whether this principal holds the given role.
*
* @param role the role to test for
* @return {@code true} if {@link #roles()} contains {@code role}
*/
default boolean hasRole(String role) {
return roles().contains(role);
}
/**
* Returns the roles granted to this principal, for coarse-grained authorization checks.
*
* @return the (possibly empty) set of roles; never {@code null}
*/
default Set<String> roles() {
return Set.of();
}
/**
* Returns arbitrary additional attributes describing this principal (for example token
* scopes, an email address or tenant information).
*
* @return the (possibly empty) claim map; never {@code null}
*/
default Map<String, Object> claims() {
return Map.of();
}
}
@@ -19,19 +19,33 @@ import java.util.Set;
*/
public final class CorsConfig {
/** Explicit set of allowed origins; ignored when {@link #allowAnyOrigin} is {@code true}. */
/**
* Explicit set of allowed origins; ignored when {@link #allowAnyOrigin} is {@code true}.
*/
private final Set<String> allowedOrigins;
/** HTTP methods advertised as allowed in preflight responses. */
/**
* HTTP methods advertised as allowed in preflight responses.
*/
private final Set<HttpMethod> allowedMethods;
/** Request headers advertised as allowed in preflight responses. */
/**
* Request headers advertised as allowed in preflight responses.
*/
private final Set<String> allowedHeaders;
/** Response headers exposed to the browser via {@code Access-Control-Expose-Headers}. */
/**
* Response headers exposed to the browser via {@code Access-Control-Expose-Headers}.
*/
private final Set<String> exposedHeaders;
/** Whether credentialed (cookie/authorization) requests are permitted. */
/**
* Whether credentialed (cookie/authorization) requests are permitted.
*/
private final boolean allowCredentials;
/** How long (in seconds) a preflight response may be cached by the browser. */
/**
* How long (in seconds) a preflight response may be cached by the browser.
*/
private final long maxAgeSeconds;
/** Whether any origin is allowed (the {@code *} wildcard). */
/**
* Whether any origin is allowed (the {@code *} wildcard).
*/
private final boolean allowAnyOrigin;
/**
@@ -90,7 +104,7 @@ public final class CorsConfig {
*
* @param origin the {@code Origin} header value, may be {@code null}
* @return {@code true} if the origin is allowed; {@code false} for a {@code null} origin or
* one not in the allow-list (unless any origin is permitted)
* one not in the allow-list (unless any origin is permitted)
*/
public boolean isOriginAllowed(String origin) {
if (origin == null) return false;
@@ -157,19 +171,33 @@ public final class CorsConfig {
* be called multiple times to accumulate values.
*/
public static final class Builder {
/** Accumulated explicit origins. */
/**
* Accumulated explicit origins.
*/
private final Set<String> allowedOrigins = new HashSet<>();
/** Accumulated allowed methods. */
/**
* Accumulated allowed methods.
*/
private final Set<HttpMethod> allowedMethods = new HashSet<>();
/** Accumulated allowed request headers. */
/**
* Accumulated allowed request headers.
*/
private final Set<String> allowedHeaders = new HashSet<>();
/** Accumulated exposed response headers. */
/**
* Accumulated exposed response headers.
*/
private final Set<String> exposedHeaders = new HashSet<>();
/** Whether credentialed requests are permitted; defaults to {@code false}. */
/**
* Whether credentialed requests are permitted; defaults to {@code false}.
*/
private boolean allowCredentials = false;
/** Preflight cache lifetime in seconds; defaults to {@code 0} (disabled). */
/**
* Preflight cache lifetime in seconds; defaults to {@code 0} (disabled).
*/
private long maxAgeSeconds = 0;
/** Whether any origin is permitted; defaults to {@code false}. */
/**
* Whether any origin is permitted; defaults to {@code false}.
*/
private boolean allowAnyOrigin = false;
/**
@@ -1,7 +1,8 @@
package dev.coph.nextusweb.server.cores;
import dev.coph.nextusweb.server.router.Response;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import java.util.stream.Collectors;
@@ -22,13 +23,21 @@ import java.util.stream.Collectors;
*/
public final class CorsHandler {
/** The policy this handler enforces. */
/**
* The policy this handler enforces.
*/
private final CorsConfig config;
/** Pre-joined {@code Access-Control-Allow-Methods} value. */
/**
* Pre-joined {@code Access-Control-Allow-Methods} value.
*/
private final String allowedMethodsHeader;
/** Pre-joined {@code Access-Control-Allow-Headers} value. */
/**
* Pre-joined {@code Access-Control-Allow-Headers} value.
*/
private final String allowedHeadersHeader;
/** Pre-joined {@code Access-Control-Expose-Headers} value. */
/**
* Pre-joined {@code Access-Control-Expose-Headers} value.
*/
private final String exposedHeadersHeader;
/**
@@ -44,6 +53,55 @@ public final class CorsHandler {
this.exposedHeadersHeader = String.join(", ", config.exposedHeaders());
}
/**
* Determines whether a request is a CORS preflight request, i.e. an {@code OPTIONS}
* request carrying an {@code Access-Control-Request-Method} header.
*
* @param method the request method
* @param headers the request headers
* @return {@code true} if the request is a preflight request
*/
public boolean isPreflight(HttpMethod method, HttpHeaders headers) {
return method.equals(HttpMethod.OPTIONS)
&& headers.contains("Access-Control-Request-Method");
}
/**
* Builds the response to a CORS preflight request.
*
* <p>If the origin is missing or disallowed the response is a {@code 403 Forbidden};
* otherwise it is a {@code 204 No Content} carrying the allowed methods and headers, the
* requested headers echoed back when no explicit allow-list is configured, and the
* {@code Access-Control-Max-Age} cache hint when configured.</p>
*
* @param origin the request's {@code Origin} header, may be {@code null}
* @param requestHeaders the request's headers (used to read
* {@code Access-Control-Request-Headers})
* @return the fully populated preflight response
*/
public Response handlePreflight(String origin, HttpHeaders requestHeaders) {
Response res = new Response().status(204);
if (!config.isOriginAllowed(origin)) {
return res.status(403);
}
applyHeaders(origin, res);
res.header("Access-Control-Allow-Methods", allowedMethodsHeader);
String requestedHeaders = requestHeaders.get("Access-Control-Request-Headers");
if (!allowedHeadersHeader.isEmpty()) {
res.header("Access-Control-Allow-Headers", allowedHeadersHeader);
} else if (requestedHeaders != null) {
res.header("Access-Control-Allow-Headers", requestedHeaders);
}
if (config.maxAgeSeconds() > 0) {
res.header("Access-Control-Max-Age", String.valueOf(config.maxAgeSeconds()));
}
return res;
}
/**
* Adds the {@code Access-Control-Allow-Origin} (and related) headers to a response, if and
@@ -77,54 +135,4 @@ public final class CorsHandler {
res.header("Access-Control-Expose-Headers", exposedHeadersHeader);
}
}
/**
* Determines whether a request is a CORS preflight request, i.e. an {@code OPTIONS}
* request carrying an {@code Access-Control-Request-Method} header.
*
* @param method the request method
* @param headers the request headers
* @return {@code true} if the request is a preflight request
*/
public boolean isPreflight(HttpMethod method, HttpHeaders headers) {
return method.equals(HttpMethod.OPTIONS)
&& headers.contains("Access-Control-Request-Method");
}
/**
* Builds the response to a CORS preflight request.
*
* <p>If the origin is missing or disallowed the response is a {@code 403 Forbidden};
* otherwise it is a {@code 204 No Content} carrying the allowed methods and headers, the
* requested headers echoed back when no explicit allow-list is configured, and the
* {@code Access-Control-Max-Age} cache hint when configured.</p>
*
* @param origin the request's {@code Origin} header, may be {@code null}
* @param requestHeaders the request's headers (used to read
* {@code Access-Control-Request-Headers})
* @return the fully populated preflight response
*/
public Response handlePreflight(String origin, HttpHeaders requestHeaders) {
Response res = new Response().status(204);
if (origin == null || !config.isOriginAllowed(origin)) {
return res.status(403);
}
applyHeaders(origin, res);
res.header("Access-Control-Allow-Methods", allowedMethodsHeader);
String requestedHeaders = requestHeaders.get("Access-Control-Request-Headers");
if (!allowedHeadersHeader.isEmpty()) {
res.header("Access-Control-Allow-Headers", allowedHeadersHeader);
} else if (requestedHeaders != null) {
res.header("Access-Control-Allow-Headers", requestedHeaders);
}
if (config.maxAgeSeconds() > 0) {
res.header("Access-Control-Max-Age", String.valueOf(config.maxAgeSeconds()));
}
return res;
}
}
@@ -26,5 +26,6 @@ public final class JsonMapper {
/**
* Private constructor preventing instantiation of this static holder class.
*/
private JsonMapper() {}
private JsonMapper() {
}
}
@@ -9,7 +9,9 @@ package dev.coph.nextusweb.server.net;
*/
public final class ClientIp {
/** The de-facto standard header proxies use to record the originating client chain. */
/**
* The de-facto standard header proxies use to record the originating client chain.
*/
public static final String FORWARDED_FOR_HEADER = "X-Forwarded-For";
private ClientIp() {
@@ -22,14 +22,22 @@ import java.util.List;
*/
public final class TrustedProxies {
/** Shared instance that trusts no peer; forwarded headers are always ignored. */
/**
* Shared instance that trusts no peer; forwarded headers are always ignored.
*/
private static final TrustedProxies NONE = new TrustedProxies(List.of(), false);
/** Shared instance that trusts every peer; forwarded headers are always honoured. */
/**
* Shared instance that trusts every peer; forwarded headers are always honoured.
*/
private static final TrustedProxies ALL = new TrustedProxies(List.of(), true);
/** Parsed CIDR ranges of trusted proxies. */
/**
* Parsed CIDR ranges of trusted proxies.
*/
private final List<Cidr> cidrs;
/** When {@code true}, every peer is trusted regardless of {@link #cidrs}. */
/**
* When {@code true}, every peer is trusted regardless of {@link #cidrs}.
*/
private final boolean trustAll;
private TrustedProxies(List<Cidr> cidrs, boolean trustAll) {
@@ -144,7 +152,7 @@ public final class TrustedProxies {
@Override
public boolean equals(Object o) {
return o instanceof Cidr c && prefixBits == c.prefixBits && Arrays.equals(base, c.base);
return o instanceof Cidr(byte[] base1, int bits) && prefixBits == bits && Arrays.equals(base, base1);
}
@Override
@@ -17,11 +17,17 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public final class FixedWindowLimiter implements RateLimiter {
/** Maximum number of requests permitted per window. */
/**
* Maximum number of requests permitted per window.
*/
private final long limit;
/** Window length in nanoseconds. */
/**
* Window length in nanoseconds.
*/
private final long windowNanos;
/** Per-key windows, created on demand. */
/**
* Per-key windows, created on demand.
*/
private final ConcurrentHashMap<String, Window> windows = new ConcurrentHashMap<>();
/**
@@ -62,9 +68,13 @@ public final class FixedWindowLimiter implements RateLimiter {
* within it.
*/
private static final class Window {
/** Start timestamp of the current window, in nanoseconds. */
/**
* Start timestamp of the current window, in nanoseconds.
*/
final AtomicLong windowStart;
/** Number of requests counted in the current window. */
/**
* Number of requests counted in the current window.
*/
final AtomicLong count;
/**
@@ -92,7 +102,7 @@ public final class FixedWindowLimiter implements RateLimiter {
* @param limit the per-window request limit
* @param windowNanos the window length in nanoseconds
* @return an allow result with the remaining quota, or a deny result with the time until
* the window resets
* the window resets
*/
synchronized Result tryAcquire(long now, long limit, long windowNanos) {
long start = windowStart.get();
@@ -21,15 +21,6 @@ import dev.coph.nextusweb.server.router.Request;
@FunctionalInterface
public interface KeyResolver {
/**
* Resolves the rate-limit key for a request.
*
* @param request the incoming request (headers, cookies, attached principal, ...)
* @param clientIp the resolved client IP, honouring trusted proxies
* @return the key the request should be counted against; never {@code null}
*/
String resolve(Request request, String clientIp);
/**
* Returns a resolver that keys purely on the resolved client IP. This is the spoofing-safe
* replacement for the old header-trusting behaviour: the IP has already been derived through
@@ -85,4 +76,13 @@ public interface KeyResolver {
return p != null ? "p:" + p.id() : "ip:" + clientIp;
};
}
/**
* Resolves the rate-limit key for a request.
*
* @param request the incoming request (headers, cookies, attached principal, ...)
* @param clientIp the resolved client IP, honouring trusted proxies
* @return the key the request should be counted against; never {@code null}
*/
String resolve(Request request, String clientIp);
}
@@ -19,11 +19,17 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public final class LeakyBucketLimiter implements RateLimiter {
/** Maximum water level (number of queued units) the bucket tolerates. */
/**
* Maximum water level (number of queued units) the bucket tolerates.
*/
private final long capacity;
/** Nanoseconds it takes for exactly one unit to leak out. */
/**
* Nanoseconds it takes for exactly one unit to leak out.
*/
private final long leakIntervalNanos;
/** Per-key buckets, created on demand. */
/**
* Per-key buckets, created on demand.
*/
private final ConcurrentHashMap<String, LeakyBucket> buckets = new ConcurrentHashMap<>();
/**
@@ -62,70 +68,69 @@ public final class LeakyBucketLimiter implements RateLimiter {
/**
* A single client's leaky bucket, tracking the current water level and the timestamp up to
* which leakage has been accounted for as one atomic unit.
*
* @param state Holds the current {@code (waterLevel, lastLeakNanos)} pair as one atomic unit.
*/
private static final class LeakyBucket {
/** Holds the current {@code (waterLevel, lastLeakNanos)} pair as one atomic unit. */
private final AtomicReference<State> state;
private record LeakyBucket(AtomicReference<State> state) {
/**
* Creates an empty bucket.
*
* @param state the creation timestamp in nanoseconds
*/
private LeakyBucket(long state) {
this.state = new AtomicReference<>(new State(0, state));
}
/**
* Creates an empty bucket.
*
* @param now the creation timestamp in nanoseconds
*/
LeakyBucket(long now) {
this.state = new AtomicReference<>(new State(0, now));
}
/**
* Returns the timestamp leakage was last accounted to, used by {@link #cleanup(long)}.
*
* @return the last-leak timestamp in nanoseconds
*/
long lastLeak() {
return state.get().lastLeakNanos();
}
/**
* Returns the timestamp leakage was last accounted to, used by {@link #cleanup(long)}.
*
* @return the last-leak timestamp in nanoseconds
*/
long lastLeak() {
return state.get().lastLeakNanos();
}
/**
* Applies elapsed leakage and, if there is room, adds one unit of water. The new level and
* the timestamp it was leaked to are swapped in together, so the previous race where the
* level advanced but the timestamp update was lost (drifting the leak accounting) can no
* longer occur.
*
* @param now the current time in nanoseconds
* @param capacity the bucket capacity
* @param leakIntervalNanos the nanoseconds per leaked unit
* @return an allow result with the remaining headroom, or a deny result with a retry
* hint when the bucket is full
*/
Result tryAcquire(long now, long capacity, long leakIntervalNanos) {
while (true) {
State current = state.get();
/**
* Applies elapsed leakage and, if there is room, adds one unit of water. The new level and
* the timestamp it was leaked to are swapped in together, so the previous race where the
* level advanced but the timestamp update was lost (drifting the leak accounting) can no
* longer occur.
*
* @param now the current time in nanoseconds
* @param capacity the bucket capacity
* @param leakIntervalNanos the nanoseconds per leaked unit
* @return an allow result with the remaining headroom, or a deny result with a retry
* hint when the bucket is full
*/
Result tryAcquire(long now, long capacity, long leakIntervalNanos) {
while (true) {
State current = state.get();
long leaked = (now - current.lastLeakNanos()) / leakIntervalNanos;
long newLevel = Math.max(0, current.waterLevel() - leaked);
long leaked = (now - current.lastLeakNanos()) / leakIntervalNanos;
long newLevel = Math.max(0, current.waterLevel() - leaked);
if (newLevel >= capacity) {
long retryMs = leakIntervalNanos / 1_000_000L;
return Result.deny(capacity, retryMs);
}
if (newLevel >= capacity) {
long retryMs = leakIntervalNanos / 1_000_000L;
return Result.deny(capacity, retryMs);
}
long newLastLeak = leaked > 0
? current.lastLeakNanos() + leaked * leakIntervalNanos
: current.lastLeakNanos();
long newLastLeak = leaked > 0
? current.lastLeakNanos() + leaked * leakIntervalNanos
: current.lastLeakNanos();
if (state.compareAndSet(current, new State(newLevel + 1, newLastLeak))) {
return Result.allow(capacity - newLevel - 1, capacity);
if (state.compareAndSet(current, new State(newLevel + 1, newLastLeak))) {
return Result.allow(capacity - newLevel - 1, capacity);
}
}
}
}
/**
* Immutable snapshot of a bucket's mutable state.
*
* @param waterLevel current water level (number of units in the bucket)
* @param lastLeakNanos timestamp leakage has been applied up to, in nanoseconds
*/
private record State(long waterLevel, long lastLeakNanos) {
/**
* Immutable snapshot of a bucket's mutable state.
*
* @param waterLevel current water level (number of units in the bucket)
* @param lastLeakNanos timestamp leakage has been applied up to, in nanoseconds
*/
private record State(long waterLevel, long lastLeakNanos) {
}
}
}
}
@@ -1,12 +1,6 @@
package dev.coph.nextusweb.server.ratelimit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
* Immutable mapping from request paths to the {@link Rule rate-limit rules} that apply to them.
@@ -24,13 +18,21 @@ import java.util.Set;
*/
public final class RateLimitConfig {
/** Rule applied to every request, or {@code null} if no global rule is configured. */
/**
* Rule applied to every request, or {@code null} if no global rule is configured.
*/
private final Rule globalRule;
/** Rules matched by exact path equality, keyed by path. */
/**
* Rules matched by exact path equality, keyed by path.
*/
private final Map<String, Rule> exactPathRules;
/** Prefix rules, pre-sorted longest-prefix-first so the most specific match wins. */
/**
* Prefix rules, pre-sorted longest-prefix-first so the most specific match wins.
*/
private final List<PrefixRule> prefixRules;
/** Every distinct limiter referenced by any rule, by identity; used for periodic cleanup. */
/**
* Every distinct limiter referenced by any rule, by identity; used for periodic cleanup.
*/
private final Set<RateLimiter> allLimiters;
/**
@@ -124,11 +126,17 @@ public final class RateLimitConfig {
* Fluent builder for {@link RateLimitConfig}.
*/
public static final class Builder {
/** Accumulated exact-path rules, keyed by path. */
/**
* Accumulated exact-path rules, keyed by path.
*/
private final Map<String, Rule> exactPathRules = new HashMap<>();
/** Accumulated prefix rules. */
/**
* Accumulated prefix rules.
*/
private final List<PrefixRule> prefixRules = new ArrayList<>();
/** The global rule, if configured. */
/**
* The global rule, if configured.
*/
private Rule globalRule;
/**
@@ -22,14 +22,22 @@ import java.util.concurrent.TimeUnit;
*/
public final class RateLimitGate {
/** Default idle age after which per-key limiter state is eligible for eviction. */
/**
* Default idle age after which per-key limiter state is eligible for eviction.
*/
private static final long DEFAULT_STALE_AFTER_NANOS = 10L * 60 * 1_000_000_000L;
/** The rule set this gate enforces. */
/**
* The rule set this gate enforces.
*/
private final RateLimitConfig config;
/** Idle age (nanoseconds) after which a limiter's per-key state may be evicted. */
/**
* Idle age (nanoseconds) after which a limiter's per-key state may be evicted.
*/
private final long staleAfterNanos;
/** Single-threaded scheduler driving periodic cleanup of stale buckets. */
/**
* Single-threaded scheduler driving periodic cleanup of stale buckets.
*/
private final ScheduledExecutorService cleanup;
/**
@@ -62,6 +70,38 @@ public final class RateLimitGate {
cleanup.scheduleAtFixedRate(this::doCleanup, 5, 5, TimeUnit.MINUTES);
}
/**
* Periodic cleanup hook invoked by the background scheduler. Asks every configured limiter to
* evict per-key state idle for longer than {@link #staleAfterNanos}. A failure cleaning one
* limiter must not abort the others or kill the scheduler, so each call is guarded.
*/
private void doCleanup() {
for (RateLimiter limiter : config.allLimiters()) {
try {
limiter.cleanup(staleAfterNanos);
} catch (RuntimeException ignored) {
}
}
}
/**
* Writes the standard rate-limit headers ({@code X-RateLimit-Limit},
* {@code X-RateLimit-Remaining}, and {@code Retry-After} when denied) onto a response.
*
* <p>Does nothing when {@code result} is {@code null} (no rule applied). The retry hint is
* rounded up to whole seconds as required by the {@code Retry-After} header.</p>
*
* @param result the limiting result, may be {@code null}
* @param res the response to decorate
*/
public static void applyHeaders(RateLimiter.Result result, Response res) {
if (result == null) return;
res.header("X-RateLimit-Limit", String.valueOf(result.limit()));
res.header("X-RateLimit-Remaining", String.valueOf(Math.max(0, result.remaining())));
if (!result.allowed()) {
res.header("Retry-After", String.valueOf((result.retryAfterMillis() + 999) / 1000));
}
}
/**
* Evaluates all rules applicable to the given path and decides whether the request may
@@ -97,41 +137,10 @@ public final class RateLimitGate {
return strictest;
}
/**
* Writes the standard rate-limit headers ({@code X-RateLimit-Limit},
* {@code X-RateLimit-Remaining}, and {@code Retry-After} when denied) onto a response.
*
* <p>Does nothing when {@code result} is {@code null} (no rule applied). The retry hint is
* rounded up to whole seconds as required by the {@code Retry-After} header.</p>
*
* @param result the limiting result, may be {@code null}
* @param res the response to decorate
*/
public static void applyHeaders(RateLimiter.Result result, Response res) {
if (result == null) return;
res.header("X-RateLimit-Limit", String.valueOf(result.limit()));
res.header("X-RateLimit-Remaining", String.valueOf(Math.max(0, result.remaining())));
if (!result.allowed()) {
res.header("Retry-After", String.valueOf((result.retryAfterMillis() + 999) / 1000));
}
}
/**
* Periodic cleanup hook invoked by the background scheduler. Asks every configured limiter to
* evict per-key state idle for longer than {@link #staleAfterNanos}. A failure cleaning one
* limiter must not abort the others or kill the scheduler, so each call is guarded.
*/
private void doCleanup() {
for (RateLimiter limiter : config.allLimiters()) {
try {
limiter.cleanup(staleAfterNanos);
} catch (RuntimeException ignored) {
}
}
}
/**
* Stops the background cleanup scheduler. Should be called when the server shuts down.
*/
public void shutdown() { cleanup.shutdown(); }
public void shutdown() {
cleanup.shutdown();
}
}
@@ -21,7 +21,7 @@ public interface RateLimiter {
* @param key the logical bucket key (for example a client IP or user identifier)
* @param nowNanos the current time in nanoseconds, typically {@link System#nanoTime()}
* @return a {@link Result} describing whether the request was allowed and the remaining
* quota
* quota
*/
Result tryAcquire(String key, long nowNanos);
@@ -18,11 +18,17 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public final class SlidingWindowLimiter implements RateLimiter {
/** Maximum effective (weighted) number of requests per window. */
/**
* Maximum effective (weighted) number of requests per window.
*/
private final long limit;
/** Window length in nanoseconds. */
/**
* Window length in nanoseconds.
*/
private final long windowNanos;
/** Per-key sliding windows, created on demand. */
/**
* Per-key sliding windows, created on demand.
*/
private final ConcurrentHashMap<String, SlidingWindow> windows = new ConcurrentHashMap<>();
/**
@@ -64,11 +70,17 @@ public final class SlidingWindowLimiter implements RateLimiter {
* previous window counts.
*/
private static final class SlidingWindow {
/** Start timestamp of the current window, in nanoseconds. */
/**
* Start timestamp of the current window, in nanoseconds.
*/
final AtomicLong windowStart;
/** Request count accumulated in the current window. */
/**
* Request count accumulated in the current window.
*/
final AtomicLong currentCount;
/** Request count carried over from the immediately preceding window. */
/**
* Request count carried over from the immediately preceding window.
*/
final AtomicLong previousCount;
/**
@@ -95,7 +107,7 @@ public final class SlidingWindowLimiter implements RateLimiter {
* @param limit the per-window effective limit
* @param windowNanos the window length in nanoseconds
* @return an allow result with the remaining quota, or a deny result with the time until
* the window slides far enough to admit the request
* the window slides far enough to admit the request
*/
synchronized Result tryAcquire(long now, long limit, long windowNanos) {
long start = windowStart.get();
@@ -20,13 +20,21 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public final class TokenBucketLimiter implements RateLimiter {
/** Maximum number of tokens a bucket can hold (the burst allowance). */
/**
* Maximum number of tokens a bucket can hold (the burst allowance).
*/
private final long capacity;
/** Refill rate expressed as tokens added per nanosecond. */
/**
* Refill rate expressed as tokens added per nanosecond.
*/
private final double tokensPerNano;
/** Approximate nanoseconds between single-token refills, used for retry hints. */
/**
* Approximate nanoseconds between single-token refills, used for retry hints.
*/
private final long refillIntervalNs;
/** Per-key buckets, created on demand. */
/**
* Per-key buckets, created on demand.
*/
private final ConcurrentHashMap<String, Bucket> buckets = new ConcurrentHashMap<>();
/**
@@ -71,7 +79,9 @@ public final class TokenBucketLimiter implements RateLimiter {
* single {@link AtomicReference} so updates are atomic as a unit.
*/
private static final class Bucket {
/** Holds the current {@code (tokensFixed, lastRefillNanos)} pair as one atomic unit. */
/**
* Holds the current {@code (tokensFixed, lastRefillNanos)} pair as one atomic unit.
*/
private final AtomicReference<State> state;
/**
@@ -104,7 +114,7 @@ public final class TokenBucketLimiter implements RateLimiter {
* @param tokensPerNano the refill rate in tokens per nanosecond
* @param refillIntervalNs the nominal nanoseconds per token (kept for retry computation)
* @return an allow result with the remaining tokens, or a deny result with a retry
* hint when fewer than one token is available
* hint when fewer than one token is available
*/
Result tryAcquire(long now, long capacity, double tokensPerNano, long refillIntervalNs) {
long oneTokenFixed = 1_000_000_000L;
@@ -3,14 +3,19 @@ package dev.coph.nextusweb.server.router;
import dev.coph.nextusweb.server.auth.Principal;
import dev.coph.nextusweb.server.json.JsonMapper;
import dev.coph.nextusweb.server.router.exception.BadRequestException;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.util.CharsetUtil;
import tools.jackson.core.JacksonException;
import tools.jackson.databind.JsonNode;
import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A convenience wrapper around a Netty {@link FullHttpRequest} that exposes the parts of an
@@ -23,28 +28,44 @@ import java.util.*;
*/
public final class Request {
/** The underlying Netty request this wrapper delegates to. */
/**
* The underlying Netty request this wrapper delegates to.
*/
private final FullHttpRequest raw;
/** Path parameters captured by the router while matching, keyed by name. */
/**
* Path parameters captured by the router while matching, keyed by name.
*/
private final Map<String, String> pathParams;
/** Lazily decoded query-string parameters; {@code null} until first accessed. */
/**
* Lazily decoded query-string parameters; {@code null} until first accessed.
*/
private Map<String, List<String>> queryParams;
/** Lazily parsed JSON body; {@code null} until {@link #json()} is first called. */
/**
* Lazily parsed JSON body; {@code null} until {@link #json()} is first called.
*/
private JsonNode jsonCache;
/** Lazily decoded request cookies, keyed by name; {@code null} until first accessed. */
/**
* Lazily decoded request cookies, keyed by name; {@code null} until first accessed.
*/
private Map<String, String> cookies;
/** Lazily created bag of per-request attributes set by middlewares/handlers. */
/**
* Lazily created bag of per-request attributes set by middlewares/handlers.
*/
private Map<String, Object> attributes;
/** Resolved client IP (honouring trusted proxies); {@code null} until set by the pipeline. */
/**
* Resolved client IP (honouring trusted proxies); {@code null} until set by the pipeline.
*/
private String clientIp;
/** Authenticated principal attached by the auth layer, or {@code null} if unauthenticated. */
/**
* Authenticated principal attached by the auth layer, or {@code null} if unauthenticated.
*/
private Principal principal;
/**
@@ -1,7 +1,9 @@
package dev.coph.nextusweb.server.router;
import dev.coph.nextusweb.server.json.JsonMapper;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.CharsetUtil;
import tools.jackson.core.JacksonException;
@@ -16,13 +18,17 @@ import tools.jackson.core.JacksonException;
*/
public final class Response {
/** HTTP status code; defaults to {@code 200}. */
private int status = 200;
/** Response headers accumulated by the handler. */
/**
* Response headers accumulated by the handler.
*/
private final HttpHeaders headers = new DefaultHttpHeaders();
/** Response body bytes; defaults to an empty array. */
/**
* HTTP status code; defaults to {@code 200}.
*/
private int status = 200;
/**
* Response body bytes; defaults to an empty array.
*/
private byte[] body = new byte[0];
/**
@@ -38,7 +44,10 @@ public final class Response {
* @param s the status code
* @return this response, for fluent chaining
*/
public Response status(int s) { this.status = s; return this; }
public Response status(int s) {
this.status = s;
return this;
}
/**
* Sets a response header, replacing any existing value for the same name.
@@ -101,19 +110,25 @@ public final class Response {
*
* @return the status code
*/
public int status() { return status; }
public int status() {
return status;
}
/**
* Returns the accumulated response headers.
*
* @return the headers
*/
public HttpHeaders headers() { return headers; }
public HttpHeaders headers() {
return headers;
}
/**
* Returns the response body bytes.
*
* @return the body bytes
*/
public byte[] body() { return body; }
public byte[] body() {
return body;
}
}
@@ -28,10 +28,14 @@ import java.util.function.BiConsumer;
*/
public final class Router {
/** Root of the routing trie; every registered path descends from here. */
/**
* Root of the routing trie; every registered path descends from here.
*/
private final Node root = new Node();
/** Middlewares executed in insertion order for every matched request. */
/**
* Middlewares executed in insertion order for every matched request.
*/
private final List<BiConsumer<Request, Response>> middlewares = new ArrayList<>();
/**
@@ -248,15 +252,25 @@ public final class Router {
* registered at this node, and optional parameter/wildcard children.
*/
private static final class Node {
/** Static child nodes keyed by their literal path segment. */
/**
* Static child nodes keyed by their literal path segment.
*/
final Map<String, Node> children = new ConcurrentHashMap<>();
/** Handlers registered directly at this node, keyed by HTTP method. */
/**
* Handlers registered directly at this node, keyed by HTTP method.
*/
final Map<HttpMethod, Handler> handlers = new ConcurrentHashMap<>();
/** Child matching any single segment as a path parameter, or {@code null} if none. */
/**
* Child matching any single segment as a path parameter, or {@code null} if none.
*/
Node paramChild;
/** Name under which {@link #paramChild} captures the matched segment. */
/**
* Name under which {@link #paramChild} captures the matched segment.
*/
String paramName;
/** Child matching any single segment as a wildcard, or {@code null} if none. */
/**
* Child matching any single segment as a wildcard, or {@code null} if none.
*/
Node wildcardChild;
}
}
@@ -16,5 +16,7 @@ public final class BadRequestException extends RuntimeException {
*
* @param message the detail message describing why the request is invalid
*/
public BadRequestException(String message) { super(message); }
public BadRequestException(String message) {
super(message);
}
}
@@ -4,11 +4,7 @@ import dev.coph.nextusweb.server.router.Response;
import io.netty.handler.codec.http.HttpHeaders;
import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
/**
* A small, immutable policy of standard HTTP security response headers that the server adds to
@@ -35,12 +31,18 @@ import java.util.Objects;
*/
public final class SecurityHeaders {
/** The {@code Strict-Transport-Security} header name, gated on a secure connection. */
/**
* The {@code Strict-Transport-Security} header name, gated on a secure connection.
*/
private static final String HSTS = "Strict-Transport-Security";
/** Headers added to every response (subject to not already being present). */
/**
* Headers added to every response (subject to not already being present).
*/
private final List<Map.Entry<String, String>> always;
/** Pre-rendered HSTS header value, or {@code null} if HSTS is disabled. */
/**
* Pre-rendered HSTS header value, or {@code null} if HSTS is disabled.
*/
private final String hstsValue;
private SecurityHeaders(Builder b) {
@@ -128,6 +130,7 @@ public final class SecurityHeaders {
* particular header.
*/
public static final class Builder {
private final Map<String, String> custom = new LinkedHashMap<>();
private boolean contentTypeOptions = true;
private String frameOptions = "DENY";
private String referrerPolicy = "no-referrer";
@@ -135,7 +138,6 @@ public final class SecurityHeaders {
private Duration hstsMaxAge;
private boolean hstsIncludeSubDomains;
private boolean hstsPreload;
private final Map<String, String> custom = new LinkedHashMap<>();
private Builder() {
}
@@ -196,10 +198,10 @@ public final class SecurityHeaders {
* hard to roll back, so enable them only once every subdomain is reliably served over
* HTTPS.
*
* @param maxAge how long browsers should pin HTTPS; {@code null}/zero/negative
* disables HSTS
* @param maxAge how long browsers should pin HTTPS; {@code null}/zero/negative
* disables HSTS
* @param includeSubDomains whether the policy also covers every subdomain
* @param preload whether to request inclusion in browser preload lists
* @param preload whether to request inclusion in browser preload lists
* @return this builder, for fluent chaining
*/
public Builder hsts(Duration maxAge, boolean includeSubDomains, boolean preload) {
@@ -30,7 +30,9 @@ import java.util.Objects;
*/
public final class TlsConfig {
/** The pre-built, shareable server SSL context. */
/**
* The pre-built, shareable server SSL context.
*/
private final SslContext sslContext;
private TlsConfig(SslContext sslContext) {
@@ -15,23 +15,41 @@ import java.util.Set;
*/
public final class WebSocketConfig {
/** Maximum size, in bytes, of a single WebSocket frame payload. */
/**
* Maximum size, in bytes, of a single WebSocket frame payload.
*/
private final int maxFramePayloadLength;
/** Maximum size, in bytes, of an aggregated (multi-frame) message. */
/**
* Maximum size, in bytes, of an aggregated (multi-frame) message.
*/
private final int maxAggregatedMessageSize;
/** Idle timeout after which an inactive connection is closed; {@code null} disables it. */
/**
* Idle timeout after which an inactive connection is closed; {@code null} disables it.
*/
private final Duration idleTimeout;
/** Explicit set of allowed origins; ignored when {@link #allowAnyOrigin} is {@code true}. */
/**
* Explicit set of allowed origins; ignored when {@link #allowAnyOrigin} is {@code true}.
*/
private final Set<String> allowedOrigins;
/** Whether connections from any origin are accepted. */
/**
* Whether connections from any origin are accepted.
*/
private final boolean allowAnyOrigin;
/** Subprotocols offered during negotiation. */
/**
* Subprotocols offered during negotiation.
*/
private final Set<String> subprotocols;
/** Whether per-message deflate compression is enabled. */
/**
* Whether per-message deflate compression is enabled.
*/
private final boolean compression;
/** Whether the protocol handler matches the path by prefix rather than exact equality. */
/**
* Whether the protocol handler matches the path by prefix rather than exact equality.
*/
private final boolean checkStartsWith;
/** Max in-flight callbacks queued per connection before read backpressure kicks in. */
/**
* Max in-flight callbacks queued per connection before read backpressure kicks in.
*/
private final int maxQueuedMessages;
/**
@@ -74,7 +92,7 @@ public final class WebSocketConfig {
*
* @param origin the request's {@code Origin} header, may be {@code null}
* @return {@code true} if any origin is allowed, or if the origin is in the allow-list;
* {@code false} for a {@code null} or disallowed origin
* {@code false} for a {@code null} or disallowed origin
*/
public boolean isOriginAllowed(String origin) {
if (allowAnyOrigin) return true;
@@ -173,23 +191,41 @@ public final class WebSocketConfig {
* list, compression enabled, and exact path matching.
*/
public static final class Builder {
/** Maximum single-frame payload size in bytes; defaults to 64&nbsp;KiB. */
private int maxFramePayloadLength = 65_536;
/** Maximum aggregated message size in bytes; defaults to 1&nbsp;MiB. */
private int maxAggregatedMessageSize = 1_048_576;
/** Idle timeout; defaults to 60 seconds. */
private Duration idleTimeout = Duration.ofSeconds(60);
/** Accumulated allowed origins (insertion-ordered). */
/**
* Accumulated allowed origins (insertion-ordered).
*/
private final Set<String> allowedOrigins = new LinkedHashSet<>();
/** Whether any origin is allowed; defaults to {@code false}. */
private boolean allowAnyOrigin = false;
/** Accumulated subprotocols (insertion-ordered). */
/**
* Accumulated subprotocols (insertion-ordered).
*/
private final Set<String> subprotocols = new LinkedHashSet<>();
/** Whether compression is enabled; defaults to {@code true}. */
/**
* Maximum single-frame payload size in bytes; defaults to 64&nbsp;KiB.
*/
private int maxFramePayloadLength = 65_536;
/**
* Maximum aggregated message size in bytes; defaults to 1&nbsp;MiB.
*/
private int maxAggregatedMessageSize = 1_048_576;
/**
* Idle timeout; defaults to 60 seconds.
*/
private Duration idleTimeout = Duration.ofSeconds(60);
/**
* Whether any origin is allowed; defaults to {@code false}.
*/
private boolean allowAnyOrigin = false;
/**
* Whether compression is enabled; defaults to {@code true}.
*/
private boolean compression = true;
/** Whether path matching uses a prefix check; defaults to {@code false}. */
/**
* Whether path matching uses a prefix check; defaults to {@code false}.
*/
private boolean checkStartsWith = false;
/** Per-connection queued-message high-watermark; defaults to 1024. */
/**
* Per-connection queued-message high-watermark; defaults to 1024.
*/
private int maxQueuedMessages = 1024;
/**
@@ -3,11 +3,7 @@ package dev.coph.nextusweb.server.websocket;
import dev.coph.nextusweb.server.auth.Principal;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Map;
@@ -40,29 +36,51 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
/** Executor running one virtual thread per drain task. */
/**
* Executor running one virtual thread per drain task.
*/
private static final Executor VT_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor();
/** The application handler receiving lifecycle callbacks. */
/**
* The application handler receiving lifecycle callbacks.
*/
private final WebSocketHandler handler;
/** The path the connection was established on. */
/**
* The path the connection was established on.
*/
private final String path;
/** Path parameters captured during routing, keyed by name. */
/**
* Path parameters captured during routing, keyed by name.
*/
private final Map<String, String> pathParams;
/** Authenticated principal for the connection, or {@code null} if anonymous. */
/**
* Authenticated principal for the connection, or {@code null} if anonymous.
*/
private final Principal principal;
/** Queued-callback high-watermark at which reads are paused. */
/**
* Queued-callback high-watermark at which reads are paused.
*/
private final int maxQueued;
/** Watermark at which reads resume after having been paused. */
/**
* Watermark at which reads resume after having been paused.
*/
private final int resumeQueued;
/** FIFO of pending callbacks for this connection; drained by a single virtual thread. */
/**
* FIFO of pending callbacks for this connection; drained by a single virtual thread.
*/
private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
/** Number of callbacks currently queued (drives the backpressure watermarks). */
/**
* Number of callbacks currently queued (drives the backpressure watermarks).
*/
private final AtomicInteger queued = new AtomicInteger();
/** Guards that at most one drainer runs at a time, preserving ordering. */
/**
* Guards that at most one drainer runs at a time, preserving ordering.
*/
private final AtomicBoolean draining = new AtomicBoolean(false);
/** Whether reads are currently paused for backpressure. */
/**
* Whether reads are currently paused for backpressure.
*/
private volatile boolean readsPaused = false;
/**
@@ -84,36 +102,6 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
this.resumeQueued = Math.max(1, this.maxQueued / 4);
}
/**
* Handles pipeline user events. On handshake completion it creates and stores the
* {@link WebSocketSession} and dispatches {@link WebSocketHandler#onOpen}; on an idle-state
* event it closes the channel; other events are passed up the pipeline.
*
* @param ctx the channel context
* @param evt the user event
* @throws Exception if the superclass handling of an unrecognized event fails
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketSession session = new WebSocketSession(ctx.channel(), path, pathParams, principal);
ctx.channel().attr(WebSocketSession.SESSION_KEY).set(session);
submit(ctx, () -> {
try {
handler.onOpen(session);
} catch (Throwable t) {
safeError(session, t);
}
});
return;
}
if (evt instanceof IdleStateEvent) {
ctx.close();
return;
}
super.userEventTriggered(ctx, evt);
}
/**
* Dispatches an incoming frame to the appropriate handler callback. Text, binary and close
* frames are forwarded to {@code onMessage}, {@code onBinary} and {@code onClose}
@@ -233,6 +221,36 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
});
}
/**
* Handles pipeline user events. On handshake completion it creates and stores the
* {@link WebSocketSession} and dispatches {@link WebSocketHandler#onOpen}; on an idle-state
* event it closes the channel; other events are passed up the pipeline.
*
* @param ctx the channel context
* @param evt the user event
* @throws Exception if the superclass handling of an unrecognized event fails
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketSession session = new WebSocketSession(ctx.channel(), path, pathParams, principal);
ctx.channel().attr(WebSocketSession.SESSION_KEY).set(session);
submit(ctx, () -> {
try {
handler.onOpen(session);
} catch (Throwable t) {
safeError(session, t);
}
});
return;
}
if (evt instanceof IdleStateEvent) {
ctx.close();
return;
}
super.userEventTriggered(ctx, evt);
}
/**
* Routes a pipeline exception to {@link WebSocketHandler#onError} (when a session exists)
* and then closes the channel.
@@ -14,7 +14,9 @@ import java.util.Map;
*/
public final class WebSocketFrameHandlerFactory {
/** Default per-connection queued-message high-watermark when none is supplied. */
/**
* Default per-connection queued-message high-watermark when none is supplied.
*/
private static final int DEFAULT_MAX_QUEUED = 1024;
/**
@@ -37,21 +39,6 @@ public final class WebSocketFrameHandlerFactory {
return create(handler, path, pathParams, null, DEFAULT_MAX_QUEUED);
}
/**
* Creates a channel handler with an authenticated principal and the default backpressure
* watermark.
*
* @param handler the application handler to dispatch lifecycle events to
* @param path the path the connection was established on
* @param pathParams the path parameters captured during routing
* @param principal the authenticated principal, or {@code null} if the connection is anonymous
* @return a new channel handler ready to be inserted into the pipeline
*/
public static ChannelHandler create(WebSocketHandler handler, String path,
Map<String, String> pathParams, Principal principal) {
return create(handler, path, pathParams, principal, DEFAULT_MAX_QUEUED);
}
/**
* Creates a channel handler with an authenticated principal and an explicit backpressure
* watermark.
@@ -68,4 +55,19 @@ public final class WebSocketFrameHandlerFactory {
int maxQueued) {
return new WebSocketFrameHandler(handler, path, pathParams, principal, maxQueued);
}
/**
* Creates a channel handler with an authenticated principal and the default backpressure
* watermark.
*
* @param handler the application handler to dispatch lifecycle events to
* @param path the path the connection was established on
* @param pathParams the path parameters captured during routing
* @param principal the authenticated principal, or {@code null} if the connection is anonymous
* @return a new channel handler ready to be inserted into the pipeline
*/
public static ChannelHandler create(WebSocketHandler handler, String path,
Map<String, String> pathParams, Principal principal) {
return create(handler, path, pathParams, principal, DEFAULT_MAX_QUEUED);
}
}
@@ -20,9 +20,13 @@ import tools.jackson.core.JacksonException;
*/
public final class WebSocketGroup {
/** Underlying Netty channel group holding the member connections. */
/**
* Underlying Netty channel group holding the member connections.
*/
private final ChannelGroup channels;
/** Human-readable name of this group. */
/**
* Human-readable name of this group.
*/
private final String name;
/**
@@ -16,7 +16,9 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public final class WebSocketRouter {
/** Root of the routing trie. */
/**
* Root of the routing trie.
*/
private final Node root = new Node();
/**
@@ -50,31 +52,6 @@ public final class WebSocketRouter {
return this;
}
/**
* Resolves a path to its handler, capturing any path parameters along the way.
*
* @param path the request path
* @return a {@link Resolution} carrying the handler and captured parameters, or {@code null}
* if no handler is registered for the path
*/
public Resolution resolve(String path) {
Map<String, String> params = new HashMap<>(4);
Node node = root;
for (String segment : split(path)) {
Node next = node.children.get(segment);
if (next != null) {
node = next;
} else if (node.paramChild != null) {
params.put(node.paramName, segment);
node = node.paramChild;
} else {
return null;
}
}
if (node.handler == null) return null;
return new Resolution(node.handler, params);
}
/**
* Splits a path into its non-empty segments, ignoring leading and collapsing internal
* slashes.
@@ -95,6 +72,31 @@ public final class WebSocketRouter {
return out;
}
/**
* Resolves a path to its handler, capturing any path parameters along the way.
*
* @param path the request path
* @return a {@link Resolution} carrying the handler and captured parameters, or {@code null}
* if no handler is registered for the path
*/
public Resolution resolve(String path) {
Map<String, String> params = new HashMap<>(4);
Node node = root;
for (String segment : split(path)) {
Node next = node.children.get(segment);
if (next != null) {
node = next;
} else if (node.paramChild != null) {
params.put(node.paramName, segment);
node = node.paramChild;
} else {
return null;
}
}
if (node.handler == null) return null;
return new Resolution(node.handler, params);
}
/**
* A successful path resolution.
*
@@ -109,13 +111,21 @@ public final class WebSocketRouter {
* optional path-parameter child, and the handler (if any) registered at this node.
*/
private static final class Node {
/** Static child nodes keyed by their literal path segment. */
/**
* Static child nodes keyed by their literal path segment.
*/
final Map<String, Node> children = new ConcurrentHashMap<>();
/** Child matching any single segment as a path parameter, or {@code null} if none. */
/**
* Child matching any single segment as a path parameter, or {@code null} if none.
*/
Node paramChild;
/** Name under which {@link #paramChild} captures the matched segment. */
/**
* Name under which {@link #paramChild} captures the matched segment.
*/
String paramName;
/** Handler registered at this node, or {@code null} if the path is only a prefix. */
/**
* Handler registered at this node, or {@code null} if the path is only a prefix.
*/
WebSocketHandler handler;
}
}
@@ -36,21 +36,35 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public final class WebSocketSession {
/** Channel attribute key under which the session is stored on its Netty channel. */
/**
* Channel attribute key under which the session is stored on its Netty channel.
*/
static final AttributeKey<WebSocketSession> SESSION_KEY =
AttributeKey.valueOf("nexusweb.ws.session");
/** The underlying Netty channel for this connection. */
/**
* The underlying Netty channel for this connection.
*/
private final Channel channel;
/** Unique identifier generated for this session. */
/**
* Unique identifier generated for this session.
*/
private final String id;
/** The path the connection was established on. */
/**
* The path the connection was established on.
*/
private final String path;
/** Path parameters captured during routing, keyed by name. */
/**
* Path parameters captured during routing, keyed by name.
*/
private final Map<String, String> pathParams;
/** Authenticated principal for this connection, or {@code null} if anonymous. */
/**
* Authenticated principal for this connection, or {@code null} if anonymous.
*/
private final Principal principal;
/** Thread-safe bag of user-defined attributes attached to the session. */
/**
* Thread-safe bag of user-defined attributes attached to the session.
*/
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
/**
@@ -70,6 +84,36 @@ public final class WebSocketSession {
this.principal = principal;
}
/**
* Low-level helper that writes a text payload directly to a channel, allocating the buffer
* from the channel's allocator. Used by collaborators that hold a channel but not a session.
*
* @param channel the channel to write to
* @param text the text to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRaw(Channel channel, String text) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer();
buf.writeCharSequence(text, CharsetUtil.UTF_8);
return channel.writeAndFlush(new TextWebSocketFrame(true, 0, buf));
}
/**
* Low-level helper that writes a binary payload directly to a channel.
*
* @param channel the channel to write to
* @param data the bytes to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRawBinary(Channel channel, byte[] data) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(Unpooled.wrappedBuffer(data));
return channel.writeAndFlush(new BinaryWebSocketFrame(buf));
}
/**
* Returns the unique identifier generated for this session.
*
@@ -121,7 +165,7 @@ public final class WebSocketSession {
* Returns the peer's remote IP address.
*
* @return the remote host address, or a string form of the address if it is not an
* {@link InetSocketAddress}; {@code null} if unavailable
* {@link InetSocketAddress}; {@code null} if unavailable
*/
public String remoteAddress() {
SocketAddress addr = channel.remoteAddress();
@@ -171,7 +215,7 @@ public final class WebSocketSession {
*
* @param text the text to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
* channel is no longer active
*/
public ChannelFuture send(String text) {
if (!channel.isActive()) return channel.newSucceededFuture();
@@ -183,7 +227,7 @@ public final class WebSocketSession {
*
* @param value the object to serialize and send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
* channel is no longer active
* @throws RuntimeException if JSON serialization fails
*/
public ChannelFuture sendJson(Object value) {
@@ -202,7 +246,7 @@ public final class WebSocketSession {
*
* @param data the bytes to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
* channel is no longer active
*/
public ChannelFuture sendBinary(byte[] data) {
if (!channel.isActive()) return channel.newSucceededFuture();
@@ -214,7 +258,7 @@ public final class WebSocketSession {
* Sends a WebSocket ping frame to the peer (e.g. as a keep-alive).
*
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
* channel is no longer active
*/
public ChannelFuture ping() {
if (!channel.isActive()) return channel.newSucceededFuture();
@@ -237,41 +281,11 @@ public final class WebSocketSession {
* @param code the WebSocket close status code
* @param reason the human-readable close reason
* @return a future completing when the close frame has been written; an already-succeeded
* future if the channel is no longer active
* future if the channel is no longer active
*/
public ChannelFuture close(int code, String reason) {
if (!channel.isActive()) return channel.newSucceededFuture();
return channel.writeAndFlush(new CloseWebSocketFrame(code, reason))
.addListener(ChannelFutureListener.CLOSE);
}
/**
* Low-level helper that writes a text payload directly to a channel, allocating the buffer
* from the channel's allocator. Used by collaborators that hold a channel but not a session.
*
* @param channel the channel to write to
* @param text the text to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRaw(Channel channel, String text) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer();
buf.writeCharSequence(text, CharsetUtil.UTF_8);
return channel.writeAndFlush(new TextWebSocketFrame(true, 0, buf));
}
/**
* Low-level helper that writes a binary payload directly to a channel.
*
* @param channel the channel to write to
* @param data the bytes to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRawBinary(Channel channel, byte[] data) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(Unpooled.wrappedBuffer(data));
return channel.writeAndFlush(new BinaryWebSocketFrame(buf));
}
}