Add comprehensive Javadoc documentation to server components, including annotations, request/response handling, routing, and WebSocket support.

This commit is contained in:
CodingPhoenixx
2026-05-29 08:50:05 +02:00
parent f00a1098b4
commit 5d6e8622bf
33 changed files with 1938 additions and 53 deletions
@@ -5,17 +5,38 @@ import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* Immutable configuration for the WebSocket subsystem: frame and message size limits, idle
* timeout, allowed origins, negotiated subprotocols, and compression. Instances are created
* through the nested {@link Builder}.
*
* <p>The values configured here govern how {@code HttpRequestHandler} sets up the WebSocket
* portion of the Netty pipeline during the upgrade handshake.</p>
*/
public final class WebSocketConfig {
/** Maximum size, in bytes, of a single WebSocket frame payload. */
private final int maxFramePayloadLength;
/** Maximum size, in bytes, of an aggregated (multi-frame) message. */
private final int maxAggregatedMessageSize;
/** Idle timeout after which an inactive connection is closed; {@code null} disables it. */
private final Duration idleTimeout;
/** Explicit set of allowed origins; ignored when {@link #allowAnyOrigin} is {@code true}. */
private final Set<String> allowedOrigins;
/** Whether connections from any origin are accepted. */
private final boolean allowAnyOrigin;
/** Subprotocols offered during negotiation. */
private final Set<String> subprotocols;
/** Whether per-message deflate compression is enabled. */
private final boolean compression;
/** Whether the protocol handler matches the path by prefix rather than exact equality. */
private final boolean checkStartsWith;
/**
* Builds an immutable configuration from a {@link Builder}, defensively copying its sets.
*
* @param b the builder carrying the configured values
*/
private WebSocketConfig(Builder b) {
this.maxFramePayloadLength = b.maxFramePayloadLength;
this.maxAggregatedMessageSize = b.maxAggregatedMessageSize;
@@ -27,110 +48,226 @@ public final class WebSocketConfig {
this.checkStartsWith = b.checkStartsWith;
}
/**
* Creates a configuration with all default values.
*
* @return a default configuration
*/
public static WebSocketConfig defaults() {
return builder().build();
}
/**
* Creates a new, empty {@link Builder}.
*
* @return a fresh builder
*/
public static Builder builder() {
return new Builder();
}
/**
* Tests whether a WebSocket upgrade from the given origin is permitted.
*
* @param origin the request's {@code Origin} header, may be {@code null}
* @return {@code true} if any origin is allowed, or if the origin is in the allow-list;
* {@code false} for a {@code null} or disallowed origin
*/
public boolean isOriginAllowed(String origin) {
if (allowAnyOrigin) return true;
if (origin == null) return false;
return allowedOrigins.contains(origin);
}
/**
* @return the maximum single-frame payload size in bytes
*/
public int maxFramePayloadLength() {
return maxFramePayloadLength;
}
/**
* @return the maximum aggregated message size in bytes
*/
public int maxAggregatedMessageSize() {
return maxAggregatedMessageSize;
}
/**
* @return the idle timeout, or {@code null} if idle connections are never closed
*/
public Duration idleTimeout() {
return idleTimeout;
}
/**
* @return {@code true} if connections from any origin are accepted
*/
public boolean allowAnyOrigin() {
return allowAnyOrigin;
}
/**
* @return the immutable set of explicitly allowed origins
*/
public Set<String> allowedOrigins() {
return allowedOrigins;
}
/**
* Returns the configured subprotocols as a comma-separated string suitable for Netty's
* protocol config.
*
* @return the comma-separated subprotocol list, or {@code null} if none are configured
*/
public String subprotocolsCsv() {
if (subprotocols.isEmpty()) return null;
return String.join(",", subprotocols);
}
/**
* @return {@code true} if per-message compression is enabled
*/
public boolean compression() {
return compression;
}
/**
* @return {@code true} if the WebSocket path is matched by prefix rather than exactly
*/
public boolean checkStartsWith() {
return checkStartsWith;
}
/**
* Fluent builder for {@link WebSocketConfig}, pre-populated with sensible defaults: 64&nbsp;KiB
* frames, 1&nbsp;MiB aggregated messages, a 60-second idle timeout, no origin restriction
* list, compression enabled, and exact path matching.
*/
public static final class Builder {
/** Maximum single-frame payload size in bytes; defaults to 64&nbsp;KiB. */
private int maxFramePayloadLength = 65_536;
/** Maximum aggregated message size in bytes; defaults to 1&nbsp;MiB. */
private int maxAggregatedMessageSize = 1_048_576;
/** Idle timeout; defaults to 60 seconds. */
private Duration idleTimeout = Duration.ofSeconds(60);
/** Accumulated allowed origins (insertion-ordered). */
private final Set<String> allowedOrigins = new LinkedHashSet<>();
/** Whether any origin is allowed; defaults to {@code false}. */
private boolean allowAnyOrigin = false;
/** Accumulated subprotocols (insertion-ordered). */
private final Set<String> subprotocols = new LinkedHashSet<>();
/** Whether compression is enabled; defaults to {@code true}. */
private boolean compression = true;
/** Whether path matching uses a prefix check; defaults to {@code false}. */
private boolean checkStartsWith = false;
/**
* Sets the maximum single-frame payload size.
*
* @param bytes the limit in bytes; must be positive
* @return this builder, for fluent chaining
* @throws IllegalArgumentException if {@code bytes <= 0}
*/
public Builder maxFramePayloadLength(int bytes) {
if (bytes <= 0) throw new IllegalArgumentException("maxFramePayloadLength must be > 0");
this.maxFramePayloadLength = bytes;
return this;
}
/**
* Sets the maximum aggregated message size.
*
* @param bytes the limit in bytes; must be positive
* @return this builder, for fluent chaining
* @throws IllegalArgumentException if {@code bytes <= 0}
*/
public Builder maxAggregatedMessageSize(int bytes) {
if (bytes <= 0) throw new IllegalArgumentException("maxAggregatedMessageSize must be > 0");
this.maxAggregatedMessageSize = bytes;
return this;
}
/**
* Sets the idle timeout after which inactive connections are closed.
*
* @param timeout the idle timeout
* @return this builder, for fluent chaining
*/
public Builder idleTimeout(Duration timeout) {
this.idleTimeout = timeout;
return this;
}
/**
* Disables the idle timeout, so connections are never closed for inactivity.
*
* @return this builder, for fluent chaining
*/
public Builder noIdleTimeout() {
this.idleTimeout = null;
return this;
}
/**
* Adds one or more origins to the allow-list.
*
* @param origins the origins to allow
* @return this builder, for fluent chaining
*/
public Builder allowedOrigins(String... origins) {
Collections.addAll(this.allowedOrigins, origins);
return this;
}
/**
* Allows WebSocket connections from any origin.
*
* @return this builder, for fluent chaining
*/
public Builder anyOrigin() {
this.allowAnyOrigin = true;
return this;
}
/**
* Adds one or more subprotocols to offer during negotiation.
*
* @param protocols the subprotocol names
* @return this builder, for fluent chaining
*/
public Builder subprotocols(String... protocols) {
Collections.addAll(this.subprotocols, protocols);
return this;
}
/**
* Enables or disables per-message compression.
*
* @param enabled {@code true} to enable compression
* @return this builder, for fluent chaining
*/
public Builder compression(boolean enabled) {
this.compression = enabled;
return this;
}
/**
* Sets whether the WebSocket path is matched by prefix rather than exact equality.
*
* @param v {@code true} to match by prefix
* @return this builder, for fluent chaining
*/
public Builder checkStartsWith(boolean v) {
this.checkStartsWith = v;
return this;
}
/**
* Builds the immutable {@link WebSocketConfig}.
*
* @return the configured instance
*/
public WebSocketConfig build() {
return new WebSocketConfig(this);
}
@@ -14,21 +14,53 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* Netty channel handler that bridges low-level WebSocket frames to the high-level
* {@link WebSocketHandler} callbacks.
*
* <p>It creates a {@link WebSocketSession} when the handshake completes, then translates each
* incoming frame into the matching callback ({@code onMessage}, {@code onBinary},
* {@code onClose}). Callbacks are dispatched on a virtual-thread executor so application code
* may block without stalling the Netty event loop, and any exception they throw is funneled to
* {@link WebSocketHandler#onError}. Idle-timeout events close the channel.</p>
*
* <p>This class is package-private; instances are created via
* {@link WebSocketFrameHandlerFactory}.</p>
*/
final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private static final Executor VT_EXECUTOR =
Executors.newVirtualThreadPerTaskExecutor();
/** Executor running one virtual thread per task, used to dispatch handler callbacks. */
private static final Executor VT_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor();
/** The application handler receiving lifecycle callbacks. */
private final WebSocketHandler handler;
/** The path the connection was established on. */
private final String path;
/** Path parameters captured during routing, keyed by name. */
private final Map<String, String> pathParams;
/**
* Creates a frame handler bound to an application handler and connection metadata.
*
* @param handler the application handler to dispatch to
* @param path the connection path
* @param pathParams the captured path parameters
*/
WebSocketFrameHandler(WebSocketHandler handler, String path, Map<String, String> pathParams) {
this.handler = handler;
this.path = path;
this.pathParams = pathParams;
}
/**
* Handles pipeline user events. On handshake completion it creates and stores the
* {@link WebSocketSession} and dispatches {@link WebSocketHandler#onOpen}; on an idle-state
* event it closes the channel; other events are passed up the pipeline.
*
* @param ctx the channel context
* @param evt the user event
* @throws Exception if the superclass handling of an unrecognized event fails
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
@@ -50,6 +82,15 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
super.userEventTriggered(ctx, evt);
}
/**
* Dispatches an incoming frame to the appropriate handler callback. Text, binary and close
* frames are forwarded to {@code onMessage}, {@code onBinary} and {@code onClose}
* respectively, each on a virtual thread. Frames arriving before the session exists are
* ignored.
*
* @param ctx the channel context
* @param frame the received WebSocket frame
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).get();
@@ -88,6 +129,13 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
}
}
/**
* Invoked when the channel goes inactive (the connection dropped without a clean close
* handshake). Clears the stored session and dispatches {@link WebSocketHandler#onClose} with
* the abnormal-closure code {@code 1006}.
*
* @param ctx the channel context
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).getAndSet(null);
@@ -101,6 +149,13 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
});
}
/**
* Routes a pipeline exception to {@link WebSocketHandler#onError} (when a session exists)
* and then closes the channel.
*
* @param ctx the channel context
* @param cause the exception that propagated up the pipeline
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).get();
@@ -108,6 +163,13 @@ final class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketF
ctx.close();
}
/**
* Invokes {@link WebSocketHandler#onError} while swallowing any secondary exception the
* error callback itself might throw, so error handling can never cascade.
*
* @param session the affected session
* @param cause the original error to report
*/
private void safeError(WebSocketSession session, Throwable cause) {
try {
handler.onError(session, cause);
@@ -4,11 +4,30 @@ import io.netty.channel.ChannelHandler;
import java.util.Map;
/**
* Small factory that creates the package-private {@code WebSocketFrameHandler} channel handler.
*
* <p>It exists so that other packages (notably {@code HttpRequestHandler} during the upgrade
* handshake) can insert a frame handler into the pipeline without the handler class itself
* having to be public. The class is a stateless utility and cannot be instantiated.</p>
*/
public final class WebSocketFrameHandlerFactory {
/**
* Private constructor preventing instantiation of this stateless utility class.
*/
private WebSocketFrameHandlerFactory() {
}
/**
* Creates a channel handler that bridges Netty WebSocket frames to the given application
* {@link WebSocketHandler}.
*
* @param handler the application handler to dispatch lifecycle events to
* @param path the path the connection was established on
* @param pathParams the path parameters captured during routing
* @return a new channel handler ready to be inserted into the pipeline
*/
public static ChannelHandler create(WebSocketHandler handler, String path,
Map<String, String> pathParams) {
return new WebSocketFrameHandler(handler, path, pathParams);
@@ -8,43 +8,93 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import tools.jackson.core.JacksonException;
/**
* A named collection of WebSocket connections that supports broadcasting to all members at
* once &mdash; useful for chat rooms, pub/sub topics, presence channels and similar fan-out
* scenarios.
*
* <p>It is backed by a Netty {@link ChannelGroup}, which automatically removes channels as they
* close, so callers do not need to prune disconnected sessions manually. The group is
* thread-safe.</p>
*/
public final class WebSocketGroup {
/** Underlying Netty channel group holding the member connections. */
private final ChannelGroup channels;
/** Human-readable name of this group. */
private final String name;
/**
* Creates an unnamed group (named {@code "anonymous"}).
*/
public WebSocketGroup() {
this("anonymous");
}
/**
* Creates a named group.
*
* @param name the group name
*/
public WebSocketGroup(String name) {
this.name = name;
this.channels = new DefaultChannelGroup(name, GlobalEventExecutor.INSTANCE);
}
/**
* @return the group name
*/
public String name() {
return name;
}
/**
* Adds a session to the group.
*
* @param session the session to add
* @return this group, for fluent chaining
*/
public WebSocketGroup add(WebSocketSession session) {
channels.add(session.channel());
return this;
}
/**
* Removes a session from the group.
*
* @param session the session to remove
* @return this group, for fluent chaining
*/
public WebSocketGroup remove(WebSocketSession session) {
channels.remove(session.channel());
return this;
}
/**
* @return the current number of member connections
*/
public int size() {
return channels.size();
}
/**
* Broadcasts a text message to every member of the group.
*
* @param text the text to send
* @return this group, for fluent chaining
*/
public WebSocketGroup broadcast(String text) {
channels.writeAndFlush(new TextWebSocketFrame(text));
return this;
}
/**
* Serializes the given value to JSON and broadcasts it as a text message to every member.
*
* @param value the object to serialize and broadcast
* @return this group, for fluent chaining
* @throws RuntimeException if JSON serialization fails
*/
public WebSocketGroup broadcastJson(Object value) {
try {
byte[] bytes = JsonMapper.MAPPER.writeValueAsBytes(value);
@@ -56,6 +106,12 @@ public final class WebSocketGroup {
return this;
}
/**
* Broadcasts a binary message to every active member, allocating a fresh buffer per channel.
*
* @param data the bytes to broadcast
* @return this group, for fluent chaining
*/
public WebSocketGroup broadcastBinary(byte[] data) {
for (var ch : channels) {
if (ch.isActive()) {
@@ -66,6 +122,14 @@ public final class WebSocketGroup {
return this;
}
/**
* Broadcasts a text message to every active member except one &mdash; typically the sender,
* so a client does not receive its own message echoed back.
*
* @param exclude the session to skip, or {@code null} to broadcast to everyone
* @param text the text to send
* @return this group, for fluent chaining
*/
public WebSocketGroup broadcastExcept(WebSocketSession exclude, String text) {
var excludeCh = exclude == null ? null : exclude.channel();
for (var ch : channels) {
@@ -76,6 +140,11 @@ public final class WebSocketGroup {
return this;
}
/**
* Closes every connection in the group.
*
* @return this group, for fluent chaining
*/
public WebSocketGroup closeAll() {
channels.close();
return this;
@@ -1,19 +1,67 @@
package dev.coph.nextusweb.server.websocket;
/**
* Application-facing callback interface for a WebSocket endpoint. Implementations react to the
* lifecycle events of a single connection: opening, incoming text and binary messages, closing,
* and errors.
*
* <p>Every method has an empty default implementation, so handlers need only override the
* events they care about. Callbacks are dispatched on virtual threads by the framework, so they
* may perform blocking work, and they are allowed to throw &mdash; any thrown exception is
* routed to {@link #onError(WebSocketSession, Throwable)}.</p>
*
* @see WebSocketSession
* @see WebSocketRouter
*/
public interface WebSocketHandler {
/**
* Invoked once the WebSocket handshake has completed and the session is ready for use.
*
* @param session the newly opened session
* @throws Exception if the handler fails; routed to {@link #onError}
*/
default void onOpen(WebSocketSession session) throws Exception {
}
/**
* Invoked when a text message is received.
*
* @param session the session the message arrived on
* @param message the decoded text payload
* @throws Exception if the handler fails; routed to {@link #onError}
*/
default void onMessage(WebSocketSession session, String message) throws Exception {
}
/**
* Invoked when a binary message is received.
*
* @param session the session the message arrived on
* @param data the raw binary payload
* @throws Exception if the handler fails; routed to {@link #onError}
*/
default void onBinary(WebSocketSession session, byte[] data) throws Exception {
}
/**
* Invoked when the connection closes, whether initiated by the peer or the server.
*
* @param session the session being closed
* @param code the WebSocket close status code
* @param reason the close reason text (empty if none was provided)
* @throws Exception if the handler fails; routed to {@link #onError}
*/
default void onClose(WebSocketSession session, int code, String reason) throws Exception {
}
/**
* Invoked when an error occurs on the connection or when another callback throws.
*
* @param session the affected session
* @param cause the error that occurred
* @throws Exception if the error handler itself fails (such failures are swallowed)
*/
default void onError(WebSocketSession session, Throwable cause) throws Exception {
}
}
@@ -6,10 +6,27 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* A trie-based router that maps WebSocket upgrade paths to {@link WebSocketHandler}s.
*
* <p>It mirrors the HTTP {@link dev.coph.nextusweb.server.router.Router Router} but is simpler:
* a path resolves to a single handler (there is no HTTP method dimension) and only static and
* {@code {param}} path-parameter segments are supported (no wildcards). Registration mutates the
* shared trie at start-up; {@link #resolve(String)} is safe to call concurrently afterwards.</p>
*/
public final class WebSocketRouter {
/** Root of the routing trie. */
private final Node root = new Node();
/**
* Registers a handler at the given path, creating any missing trie nodes. Segments wrapped
* in braces (e.g. {@code /chat/{room}}) are treated as path parameters.
*
* @param path the WebSocket path to mount the handler at
* @param handler the handler to invoke for connections on that path
* @return this router, for fluent chaining
*/
public WebSocketRouter on(String path, WebSocketHandler handler) {
Node node = root;
for (String segment : split(path)) {
@@ -27,6 +44,13 @@ public final class WebSocketRouter {
return this;
}
/**
* Resolves a path to its handler, capturing any path parameters along the way.
*
* @param path the request path
* @return a {@link Resolution} carrying the handler and captured parameters, or {@code null}
* if no handler is registered for the path
*/
public Resolution resolve(String path) {
Map<String, String> params = new HashMap<>(4);
Node node = root;
@@ -45,6 +69,13 @@ public final class WebSocketRouter {
return new Resolution(node.handler, params);
}
/**
* Splits a path into its non-empty segments, ignoring leading and collapsing internal
* slashes.
*
* @param path the raw path
* @return the ordered list of path segments
*/
private static List<String> split(String path) {
List<String> out = new ArrayList<>();
int start = path.startsWith("/") ? 1 : 0;
@@ -58,13 +89,27 @@ public final class WebSocketRouter {
return out;
}
/**
* A successful path resolution.
*
* @param handler the handler bound to the matched path
* @param pathParams the path parameters captured while matching, keyed by name
*/
public record Resolution(WebSocketHandler handler, Map<String, String> pathParams) {
}
/**
* A single node in the WebSocket routing trie. Holds static children keyed by segment, an
* optional path-parameter child, and the handler (if any) registered at this node.
*/
private static final class Node {
/** Static child nodes keyed by their literal path segment. */
final Map<String, Node> children = new ConcurrentHashMap<>();
/** Child matching any single segment as a path parameter, or {@code null} if none. */
Node paramChild;
/** Name under which {@link #paramChild} captures the matched segment. */
String paramName;
/** Handler registered at this node, or {@code null} if the path is only a prefix. */
WebSocketHandler handler;
}
}
@@ -20,17 +20,44 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* Represents a single, live WebSocket connection and is the primary object application handlers
* interact with.
*
* <p>It wraps the underlying Netty {@link Channel} and offers convenient methods to send text,
* JSON and binary payloads, to ping the peer, and to close the connection. It also carries
* read-only connection metadata (a generated id, the path, and captured path parameters) and a
* thread-safe bag of arbitrary {@link #attribute(String, Object) attributes} that handlers can
* use to associate state with the connection.</p>
*
* <p>Each connection's session is stored on its channel under {@link #SESSION_KEY} so the frame
* handler can retrieve it for every incoming frame.</p>
*/
public final class WebSocketSession {
/** Channel attribute key under which the session is stored on its Netty channel. */
static final AttributeKey<WebSocketSession> SESSION_KEY =
AttributeKey.valueOf("nexusweb.ws.session");
/** The underlying Netty channel for this connection. */
private final Channel channel;
/** Unique identifier generated for this session. */
private final String id;
/** The path the connection was established on. */
private final String path;
/** Path parameters captured during routing, keyed by name. */
private final Map<String, String> pathParams;
/** Thread-safe bag of user-defined attributes attached to the session. */
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
/**
* Creates a session for a freshly upgraded channel. Package-private; created by the frame
* handler once the handshake completes.
*
* @param channel the underlying Netty channel
* @param path the connection path
* @param pathParams the path parameters captured during routing
*/
WebSocketSession(Channel channel, String path, Map<String, String> pathParams) {
this.channel = channel;
this.id = UUID.randomUUID().toString();
@@ -38,22 +65,43 @@ public final class WebSocketSession {
this.pathParams = pathParams;
}
/**
* @return the unique session id
*/
public String id() {
return id;
}
/**
* @return the path the connection was established on
*/
public String path() {
return path;
}
/**
* Returns the value of a path parameter captured during routing.
*
* @param name the parameter name (without braces)
* @return the captured value, or {@code null} if there is no such parameter
*/
public String pathParam(String name) {
return pathParams.get(name);
}
/**
* @return {@code true} if the underlying channel is still active (open)
*/
public boolean isOpen() {
return channel.isActive();
}
/**
* Returns the peer's remote IP address.
*
* @return the remote host address, or a string form of the address if it is not an
* {@link InetSocketAddress}; {@code null} if unavailable
*/
public String remoteAddress() {
SocketAddress addr = channel.remoteAddress();
if (addr instanceof InetSocketAddress inet) {
@@ -62,26 +110,59 @@ public final class WebSocketSession {
return addr == null ? null : addr.toString();
}
/**
* @return the underlying Netty channel, for advanced use
*/
public Channel channel() {
return channel;
}
/**
* Associates a user-defined attribute with this session, or removes it when {@code value} is
* {@code null}.
*
* @param name the attribute name
* @param value the value to store, or {@code null} to remove the attribute
* @return this session, for fluent chaining
*/
public WebSocketSession attribute(String name, Object value) {
if (value == null) attributes.remove(name);
else attributes.put(name, value);
return this;
}
/**
* Retrieves a previously stored attribute, cast to the caller's expected type.
*
* @param name the attribute name
* @param <T> the expected attribute type
* @return the stored value, or {@code null} if absent
*/
@SuppressWarnings("unchecked")
public <T> T attribute(String name) {
return (T) attributes.get(name);
}
/**
* Sends a text message to the peer.
*
* @param text the text to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
public ChannelFuture send(String text) {
if (!channel.isActive()) return channel.newSucceededFuture();
return channel.writeAndFlush(new TextWebSocketFrame(text));
}
/**
* Serializes the given value to JSON and sends it as a text message.
*
* @param value the object to serialize and send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
* @throws RuntimeException if JSON serialization fails
*/
public ChannelFuture sendJson(Object value) {
try {
byte[] bytes = JsonMapper.MAPPER.writeValueAsBytes(value);
@@ -93,27 +174,63 @@ public final class WebSocketSession {
}
}
/**
* Sends a binary message to the peer.
*
* @param data the bytes to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
public ChannelFuture sendBinary(byte[] data) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(data);
return channel.writeAndFlush(new BinaryWebSocketFrame(buf));
}
/**
* Sends a WebSocket ping frame to the peer (e.g. as a keep-alive).
*
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
public ChannelFuture ping() {
if (!channel.isActive()) return channel.newSucceededFuture();
return channel.writeAndFlush(new PingWebSocketFrame());
}
/**
* Closes the connection with the normal-closure status code {@code 1000} and no reason.
*
* @return a future completing when the close frame has been written
*/
public ChannelFuture close() {
return close(1000, "");
}
/**
* Closes the connection with an explicit status code and reason, closing the channel once
* the close frame has been written.
*
* @param code the WebSocket close status code
* @param reason the human-readable close reason
* @return a future completing when the close frame has been written; an already-succeeded
* future if the channel is no longer active
*/
public ChannelFuture close(int code, String reason) {
if (!channel.isActive()) return channel.newSucceededFuture();
return channel.writeAndFlush(new CloseWebSocketFrame(code, reason))
.addListener(ChannelFutureListener.CLOSE);
}
/**
* Low-level helper that writes a text payload directly to a channel, allocating the buffer
* from the channel's allocator. Used by collaborators that hold a channel but not a session.
*
* @param channel the channel to write to
* @param text the text to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRaw(Channel channel, String text) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer();
@@ -121,6 +238,14 @@ public final class WebSocketSession {
return channel.writeAndFlush(new TextWebSocketFrame(true, 0, buf));
}
/**
* Low-level helper that writes a binary payload directly to a channel.
*
* @param channel the channel to write to
* @param data the bytes to send
* @return a future completing when the write finishes; an already-succeeded future if the
* channel is no longer active
*/
static ChannelFuture sendRawBinary(Channel channel, byte[] data) {
if (!channel.isActive()) return channel.newSucceededFuture();
ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(Unpooled.wrappedBuffer(data));