package dev.coph.nextusweb.server.websocket; import dev.coph.nextusweb.server.auth.Principal; import dev.coph.nextusweb.server.json.JsonMapper; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import tools.jackson.core.JacksonException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * Represents a single, live WebSocket connection and is the primary object application handlers * interact with. * *

It wraps the underlying Netty {@link Channel} and offers convenient methods to send text, * JSON and binary payloads, to ping the peer, and to close the connection. It also carries * read-only connection metadata (a generated id, the path, and captured path parameters) and a * thread-safe bag of arbitrary {@link #attribute(String, Object) attributes} that handlers can * use to associate state with the connection.

* *

Each connection's session is stored on its channel under {@link #SESSION_KEY} so the frame * handler can retrieve it for every incoming frame.

*/ public final class WebSocketSession { /** * Channel attribute key under which the session is stored on its Netty channel. */ static final AttributeKey SESSION_KEY = AttributeKey.valueOf("nexusweb.ws.session"); /** * The underlying Netty channel for this connection. */ private final Channel channel; /** * Unique identifier generated for this session. */ private final String id; /** * The path the connection was established on. */ private final String path; /** * Path parameters captured during routing, keyed by name. */ private final Map pathParams; /** * Authenticated principal for this connection, or {@code null} if anonymous. */ private final Principal principal; /** * Thread-safe bag of user-defined attributes attached to the session. */ private final Map attributes = new ConcurrentHashMap<>(); /** * Creates a session for a freshly upgraded channel. Package-private; created by the frame * handler once the handshake completes. * * @param channel the underlying Netty channel * @param path the connection path * @param pathParams the path parameters captured during routing * @param principal the authenticated principal, or {@code null} if the connection is anonymous */ WebSocketSession(Channel channel, String path, Map pathParams, Principal principal) { this.channel = channel; this.id = UUID.randomUUID().toString(); this.path = path; this.pathParams = pathParams; this.principal = principal; } /** * Low-level helper that writes a text payload directly to a channel, allocating the buffer * from the channel's allocator. Used by collaborators that hold a channel but not a session. * * @param channel the channel to write to * @param text the text to send * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active */ static ChannelFuture sendRaw(Channel channel, String text) { if (!channel.isActive()) return channel.newSucceededFuture(); ByteBuf buf = channel.alloc().buffer(); buf.writeCharSequence(text, CharsetUtil.UTF_8); return channel.writeAndFlush(new TextWebSocketFrame(true, 0, buf)); } /** * Low-level helper that writes a binary payload directly to a channel. * * @param channel the channel to write to * @param data the bytes to send * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active */ static ChannelFuture sendRawBinary(Channel channel, byte[] data) { if (!channel.isActive()) return channel.newSucceededFuture(); ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(Unpooled.wrappedBuffer(data)); return channel.writeAndFlush(new BinaryWebSocketFrame(buf)); } /** * Returns the unique identifier generated for this session. * * @return the unique session id */ public String id() { return id; } /** * Returns the path the connection was established on. * * @return the path the connection was established on */ public String path() { return path; } /** * Returns the value of a path parameter captured during routing. * * @param name the parameter name (without braces) * @return the captured value, or {@code null} if there is no such parameter */ public String pathParam(String name) { return pathParams.get(name); } /** * Returns the authenticated principal associated with this connection, established by the auth * layer during the upgrade handshake. * * @return the principal, or {@code null} if the connection is anonymous */ public Principal principal() { return principal; } /** * Indicates whether the connection is still open. * * @return {@code true} if the underlying channel is still active (open) */ public boolean isOpen() { return channel.isActive(); } /** * Returns the peer's remote IP address. * * @return the remote host address, or a string form of the address if it is not an * {@link InetSocketAddress}; {@code null} if unavailable */ public String remoteAddress() { SocketAddress addr = channel.remoteAddress(); if (addr instanceof InetSocketAddress inet) { return inet.getAddress().getHostAddress(); } return addr == null ? null : addr.toString(); } /** * Returns the underlying Netty channel for advanced, low-level use. * * @return the underlying Netty channel, for advanced use */ public Channel channel() { return channel; } /** * Associates a user-defined attribute with this session, or removes it when {@code value} is * {@code null}. * * @param name the attribute name * @param value the value to store, or {@code null} to remove the attribute * @return this session, for fluent chaining */ public WebSocketSession attribute(String name, Object value) { if (value == null) attributes.remove(name); else attributes.put(name, value); return this; } /** * Retrieves a previously stored attribute, cast to the caller's expected type. * * @param name the attribute name * @param the expected attribute type * @return the stored value, or {@code null} if absent */ @SuppressWarnings("unchecked") public T attribute(String name) { return (T) attributes.get(name); } /** * Sends a text message to the peer. * * @param text the text to send * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active */ public ChannelFuture send(String text) { if (!channel.isActive()) return channel.newSucceededFuture(); return channel.writeAndFlush(new TextWebSocketFrame(text)); } /** * Serializes the given value to JSON and sends it as a text message. * * @param value the object to serialize and send * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active * @throws RuntimeException if JSON serialization fails */ public ChannelFuture sendJson(Object value) { try { byte[] bytes = JsonMapper.MAPPER.writeValueAsBytes(value); if (!channel.isActive()) return channel.newSucceededFuture(); ByteBuf buf = channel.alloc().buffer(bytes.length).writeBytes(bytes); return channel.writeAndFlush(new TextWebSocketFrame(true, 0, buf)); } catch (JacksonException e) { throw new RuntimeException("JSON serialization failed", e); } } /** * Sends a binary message to the peer. * * @param data the bytes to send * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active */ public ChannelFuture sendBinary(byte[] data) { if (!channel.isActive()) return channel.newSucceededFuture(); ByteBuf buf = channel.alloc().buffer(data.length).writeBytes(data); return channel.writeAndFlush(new BinaryWebSocketFrame(buf)); } /** * Sends a WebSocket ping frame to the peer (e.g. as a keep-alive). * * @return a future completing when the write finishes; an already-succeeded future if the * channel is no longer active */ public ChannelFuture ping() { if (!channel.isActive()) return channel.newSucceededFuture(); return channel.writeAndFlush(new PingWebSocketFrame()); } /** * Closes the connection with the normal-closure status code {@code 1000} and no reason. * * @return a future completing when the close frame has been written */ public ChannelFuture close() { return close(1000, ""); } /** * Closes the connection with an explicit status code and reason, closing the channel once * the close frame has been written. * * @param code the WebSocket close status code * @param reason the human-readable close reason * @return a future completing when the close frame has been written; an already-succeeded * future if the channel is no longer active */ public ChannelFuture close(int code, String reason) { if (!channel.isActive()) return channel.newSucceededFuture(); return channel.writeAndFlush(new CloseWebSocketFrame(code, reason)) .addListener(ChannelFutureListener.CLOSE); } }