Reformat code for improved readability and consistency across classes
This commit is contained in:
@@ -63,8 +63,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
/**
|
||||
* Executor running one virtual thread per task, used to offload blocking handler work.
|
||||
*/
|
||||
private static final Executor VT_EXECUTOR =
|
||||
Executors.newVirtualThreadPerTaskExecutor();
|
||||
private static final Executor VT_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
/**
|
||||
* Router resolving requests to handlers.
|
||||
@@ -112,8 +111,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
* @param authGate the auth gate, or {@code null} to disable the auth layer
|
||||
* @param trustedProxies the trusted-proxy policy, or {@code null} for {@link TrustedProxies#none()}
|
||||
*/
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit,
|
||||
AuthGate authGate, TrustedProxies trustedProxies) {
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit, AuthGate authGate, TrustedProxies trustedProxies) {
|
||||
this(router, cors, rateLimit, authGate, trustedProxies, null, null, null, false);
|
||||
}
|
||||
|
||||
@@ -130,10 +128,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
* @param securityHeaders the security-header policy, or {@code null} to add no security headers
|
||||
* @param secure whether the server's connections are secured by TLS (gates HSTS)
|
||||
*/
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit,
|
||||
AuthGate authGate, TrustedProxies trustedProxies,
|
||||
WebSocketRouter wsRouter, WebSocketConfig wsConfig,
|
||||
SecurityHeaders securityHeaders, boolean secure) {
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit, AuthGate authGate, TrustedProxies trustedProxies, WebSocketRouter wsRouter, WebSocketConfig wsConfig, SecurityHeaders securityHeaders, boolean secure) {
|
||||
this.router = router;
|
||||
this.cors = cors;
|
||||
this.rateLimit = rateLimit;
|
||||
@@ -156,9 +151,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
* @param wsRouter the WebSocket router, or {@code null} to disable WebSocket support
|
||||
* @param wsConfig the WebSocket configuration, used only when {@code wsRouter} is non-null
|
||||
*/
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit,
|
||||
AuthGate authGate, TrustedProxies trustedProxies,
|
||||
WebSocketRouter wsRouter, WebSocketConfig wsConfig) {
|
||||
public HttpRequestHandler(Router router, CorsHandler cors, RateLimitGate rateLimit, AuthGate authGate, TrustedProxies trustedProxies, WebSocketRouter wsRouter, WebSocketConfig wsConfig) {
|
||||
this(router, cors, rateLimit, authGate, trustedProxies, wsRouter, wsConfig, null, false);
|
||||
}
|
||||
|
||||
@@ -264,21 +257,15 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
|
||||
if (wsConfig.idleTimeout() != null) {
|
||||
long secs = Math.max(1, wsConfig.idleTimeout().toSeconds());
|
||||
pipeline.addBefore(myName, "ws-idle",
|
||||
new IdleStateHandler(0, 0, secs, TimeUnit.SECONDS));
|
||||
pipeline.addBefore(myName, "ws-idle", new IdleStateHandler(0, 0, secs, TimeUnit.SECONDS));
|
||||
}
|
||||
if (wsConfig.compression()) {
|
||||
pipeline.addBefore(myName, "ws-deflate",
|
||||
new WebSocketServerCompressionHandler());
|
||||
pipeline.addBefore(myName, "ws-deflate", new WebSocketServerCompressionHandler());
|
||||
}
|
||||
pipeline.addBefore(myName, "ws-proto",
|
||||
new WebSocketServerProtocolHandler(protoCfg));
|
||||
pipeline.addBefore(myName, "ws-proto", new WebSocketServerProtocolHandler(protoCfg));
|
||||
|
||||
pipeline.addBefore(myName, "ws-aggregator",
|
||||
new WebSocketFrameAggregator(wsConfig.maxAggregatedMessageSize()));
|
||||
pipeline.addBefore(myName, "ws-frames",
|
||||
WebSocketFrameHandlerFactory.create(resolution.handler(), path, resolution.pathParams(),
|
||||
principal, wsConfig.maxQueuedMessages()));
|
||||
pipeline.addBefore(myName, "ws-aggregator", new WebSocketFrameAggregator(wsConfig.maxAggregatedMessageSize()));
|
||||
pipeline.addBefore(myName, "ws-frames", WebSocketFrameHandlerFactory.create(resolution.handler(), path, resolution.pathParams(), principal, wsConfig.maxQueuedMessages()));
|
||||
|
||||
ChannelHandlerContext anchor = pipeline.context(HttpObjectAggregator.class);
|
||||
if (anchor == null) anchor = pipeline.firstContext();
|
||||
@@ -315,8 +302,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
String clientIp = resolveClientIp(ctx, raw);
|
||||
|
||||
Router.Resolution resolution = router.resolve(raw.method(), path);
|
||||
Map<String, String> params = resolution instanceof Router.Resolution.Match m
|
||||
? m.pathParams() : Map.of();
|
||||
Map<String, String> params = resolution instanceof Router.Resolution.Match m ? m.pathParams() : Map.of();
|
||||
Request request = new Request(raw, params);
|
||||
request.clientIp(clientIp);
|
||||
|
||||
@@ -346,11 +332,11 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
switch (resolution) {
|
||||
case Router.Resolution.Match m -> {
|
||||
try {
|
||||
for (var mw : router.middlewares()) mw.accept(request, res);
|
||||
for (var mw : router.middlewares())
|
||||
mw.accept(request, res);
|
||||
m.handler().handle(request, res);
|
||||
} catch (BadRequestException e) {
|
||||
res.status(400).json(Map.of("error",
|
||||
e.getMessage() == null ? "Bad Request" : e.getMessage()));
|
||||
res.status(400).json(Map.of("error", e.getMessage() == null ? "Bad Request" : e.getMessage()));
|
||||
} catch (Exception e) {
|
||||
LOG.log(Level.ERROR, "Handler failed for " + raw.method() + " " + path, e);
|
||||
res.status(500).json("{\"error\":\"Internal Server Error\"}");
|
||||
@@ -403,8 +389,7 @@ public final class HttpRequestHandler extends SimpleChannelInboundHandler<FullHt
|
||||
*/
|
||||
private String resolveClientIp(ChannelHandlerContext ctx, FullHttpRequest raw) {
|
||||
SocketAddress addr = ctx.channel().remoteAddress();
|
||||
String socketIp = (addr instanceof InetSocketAddress isa && isa.getAddress() != null)
|
||||
? isa.getAddress().getHostAddress() : "unknown";
|
||||
String socketIp = (addr instanceof InetSocketAddress isa && isa.getAddress() != null) ? isa.getAddress().getHostAddress() : "unknown";
|
||||
String forwarded = raw.headers().get(ClientIp.FORWARDED_FOR_HEADER);
|
||||
return ClientIp.resolve(socketIp, forwarded, trustedProxies);
|
||||
}
|
||||
|
||||
@@ -199,7 +199,8 @@ public final class HttpServer {
|
||||
* @return this instance, for fluent chaining
|
||||
*/
|
||||
public HttpServer maxHttpContentLength(int bytes) {
|
||||
if (bytes <= 0) throw new IllegalArgumentException("maxHttpContentLength must be > 0");
|
||||
if (bytes <= 0)
|
||||
throw new IllegalArgumentException("maxHttpContentLength must be > 0");
|
||||
this.maxHttpContentLength = bytes;
|
||||
return this;
|
||||
}
|
||||
@@ -292,14 +293,11 @@ public final class HttpServer {
|
||||
pipeline.addLast("ssl", tlsCfg.newHandler(ch.alloc()));
|
||||
}
|
||||
if (readTimeoutSeconds > 0) {
|
||||
pipeline.addLast("read-timeout",
|
||||
new ReadTimeoutHandler(readTimeoutSeconds, TimeUnit.SECONDS));
|
||||
pipeline.addLast("read-timeout", new ReadTimeoutHandler(readTimeoutSeconds, TimeUnit.SECONDS));
|
||||
}
|
||||
pipeline.addLast(new HttpServerCodec())
|
||||
.addLast(new HttpObjectAggregator(maxContent))
|
||||
.addLast(new HttpRequestHandler(router, corsHandler, rateLimitGate,
|
||||
auth, proxies, websocketRouter, websocketConfig,
|
||||
secHeaders, tlsEnabled));
|
||||
.addLast(new HttpRequestHandler(router, corsHandler, rateLimitGate, auth, proxies, websocketRouter, websocketConfig, secHeaders, tlsEnabled));
|
||||
}
|
||||
})
|
||||
.bind(port).sync().channel().closeFuture().sync();
|
||||
|
||||
@@ -37,6 +37,7 @@ public final class AuthConfig {
|
||||
* Optional {@code WWW-Authenticate} challenge sent with {@code 401} responses.
|
||||
*/
|
||||
private final String challenge;
|
||||
|
||||
private AuthConfig(Builder b) {
|
||||
this.globalRule = b.globalRule;
|
||||
this.exactPathRules = Map.copyOf(b.exactPathRules);
|
||||
|
||||
@@ -71,66 +71,66 @@ public final class LeakyBucketLimiter implements RateLimiter {
|
||||
*
|
||||
* @param state Holds the current {@code (waterLevel, lastLeakNanos)} pair as one atomic unit.
|
||||
*/
|
||||
private record LeakyBucket(AtomicReference<State> state) {
|
||||
/**
|
||||
* Creates an empty bucket.
|
||||
*
|
||||
* @param state the creation timestamp in nanoseconds
|
||||
*/
|
||||
private LeakyBucket(long state) {
|
||||
this.state = new AtomicReference<>(new State(0, state));
|
||||
}
|
||||
private record LeakyBucket(AtomicReference<State> state) {
|
||||
/**
|
||||
* Creates an empty bucket.
|
||||
*
|
||||
* @param state the creation timestamp in nanoseconds
|
||||
*/
|
||||
private LeakyBucket(long state) {
|
||||
this.state = new AtomicReference<>(new State(0, state));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the timestamp leakage was last accounted to, used by {@link #cleanup(long)}.
|
||||
*
|
||||
* @return the last-leak timestamp in nanoseconds
|
||||
*/
|
||||
long lastLeak() {
|
||||
return state.get().lastLeakNanos();
|
||||
}
|
||||
/**
|
||||
* Returns the timestamp leakage was last accounted to, used by {@link #cleanup(long)}.
|
||||
*
|
||||
* @return the last-leak timestamp in nanoseconds
|
||||
*/
|
||||
long lastLeak() {
|
||||
return state.get().lastLeakNanos();
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies elapsed leakage and, if there is room, adds one unit of water. The new level and
|
||||
* the timestamp it was leaked to are swapped in together, so the previous race where the
|
||||
* level advanced but the timestamp update was lost (drifting the leak accounting) can no
|
||||
* longer occur.
|
||||
*
|
||||
* @param now the current time in nanoseconds
|
||||
* @param capacity the bucket capacity
|
||||
* @param leakIntervalNanos the nanoseconds per leaked unit
|
||||
* @return an allow result with the remaining headroom, or a deny result with a retry
|
||||
* hint when the bucket is full
|
||||
*/
|
||||
Result tryAcquire(long now, long capacity, long leakIntervalNanos) {
|
||||
while (true) {
|
||||
State current = state.get();
|
||||
/**
|
||||
* Applies elapsed leakage and, if there is room, adds one unit of water. The new level and
|
||||
* the timestamp it was leaked to are swapped in together, so the previous race where the
|
||||
* level advanced but the timestamp update was lost (drifting the leak accounting) can no
|
||||
* longer occur.
|
||||
*
|
||||
* @param now the current time in nanoseconds
|
||||
* @param capacity the bucket capacity
|
||||
* @param leakIntervalNanos the nanoseconds per leaked unit
|
||||
* @return an allow result with the remaining headroom, or a deny result with a retry
|
||||
* hint when the bucket is full
|
||||
*/
|
||||
Result tryAcquire(long now, long capacity, long leakIntervalNanos) {
|
||||
while (true) {
|
||||
State current = state.get();
|
||||
|
||||
long leaked = (now - current.lastLeakNanos()) / leakIntervalNanos;
|
||||
long newLevel = Math.max(0, current.waterLevel() - leaked);
|
||||
long leaked = (now - current.lastLeakNanos()) / leakIntervalNanos;
|
||||
long newLevel = Math.max(0, current.waterLevel() - leaked);
|
||||
|
||||
if (newLevel >= capacity) {
|
||||
long retryMs = leakIntervalNanos / 1_000_000L;
|
||||
return Result.deny(capacity, retryMs);
|
||||
}
|
||||
if (newLevel >= capacity) {
|
||||
long retryMs = leakIntervalNanos / 1_000_000L;
|
||||
return Result.deny(capacity, retryMs);
|
||||
}
|
||||
|
||||
long newLastLeak = leaked > 0
|
||||
? current.lastLeakNanos() + leaked * leakIntervalNanos
|
||||
: current.lastLeakNanos();
|
||||
long newLastLeak = leaked > 0
|
||||
? current.lastLeakNanos() + leaked * leakIntervalNanos
|
||||
: current.lastLeakNanos();
|
||||
|
||||
if (state.compareAndSet(current, new State(newLevel + 1, newLastLeak))) {
|
||||
return Result.allow(capacity - newLevel - 1, capacity);
|
||||
}
|
||||
if (state.compareAndSet(current, new State(newLevel + 1, newLastLeak))) {
|
||||
return Result.allow(capacity - newLevel - 1, capacity);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Immutable snapshot of a bucket's mutable state.
|
||||
*
|
||||
* @param waterLevel current water level (number of units in the bucket)
|
||||
* @param lastLeakNanos timestamp leakage has been applied up to, in nanoseconds
|
||||
*/
|
||||
private record State(long waterLevel, long lastLeakNanos) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Immutable snapshot of a bucket's mutable state.
|
||||
*
|
||||
* @param waterLevel current water level (number of units in the bucket)
|
||||
* @param lastLeakNanos timestamp leakage has been applied up to, in nanoseconds
|
||||
*/
|
||||
private record State(long waterLevel, long lastLeakNanos) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user