package dev.coph.nextusweb.server.websocket; import dev.coph.nextusweb.server.json.JsonMapper; import io.netty.buffer.Unpooled; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import tools.jackson.core.JacksonException; /** * A named collection of WebSocket connections that supports broadcasting to all members at * once — useful for chat rooms, pub/sub topics, presence channels and similar fan-out * scenarios. * *

It is backed by a Netty {@link ChannelGroup}, which automatically removes channels as they * close, so callers do not need to prune disconnected sessions manually. The group is * thread-safe.

*/ public final class WebSocketGroup { /** Underlying Netty channel group holding the member connections. */ private final ChannelGroup channels; /** Human-readable name of this group. */ private final String name; /** * Creates an unnamed group (named {@code "anonymous"}). */ public WebSocketGroup() { this("anonymous"); } /** * Creates a named group. * * @param name the group name */ public WebSocketGroup(String name) { this.name = name; this.channels = new DefaultChannelGroup(name, GlobalEventExecutor.INSTANCE); } /** * Returns the name of this group. * * @return the group name */ public String name() { return name; } /** * Adds a session to the group. * * @param session the session to add * @return this group, for fluent chaining */ public WebSocketGroup add(WebSocketSession session) { channels.add(session.channel()); return this; } /** * Removes a session from the group. * * @param session the session to remove * @return this group, for fluent chaining */ public WebSocketGroup remove(WebSocketSession session) { channels.remove(session.channel()); return this; } /** * Returns how many connections are currently in the group. * * @return the current number of member connections */ public int size() { return channels.size(); } /** * Broadcasts a text message to every member of the group. * * @param text the text to send * @return this group, for fluent chaining */ public WebSocketGroup broadcast(String text) { channels.writeAndFlush(new TextWebSocketFrame(text)); return this; } /** * Serializes the given value to JSON and broadcasts it as a text message to every member. * * @param value the object to serialize and broadcast * @return this group, for fluent chaining * @throws RuntimeException if JSON serialization fails */ public WebSocketGroup broadcastJson(Object value) { try { byte[] bytes = JsonMapper.MAPPER.writeValueAsBytes(value); // Build the text frame straight from the serialized UTF-8 bytes; the channel group // duplicates the payload per recipient, so no String round-trip re-encode is needed. channels.writeAndFlush(new TextWebSocketFrame(Unpooled.wrappedBuffer(bytes))); } catch (JacksonException e) { throw new RuntimeException("JSON serialization failed", e); } return this; } /** * Broadcasts a binary message to every active member, allocating a fresh buffer per channel. * * @param data the bytes to broadcast * @return this group, for fluent chaining */ public WebSocketGroup broadcastBinary(byte[] data) { for (var ch : channels) { if (ch.isActive()) { var buf = ch.alloc().buffer(data.length).writeBytes(data); ch.writeAndFlush(new BinaryWebSocketFrame(buf)); } } return this; } /** * Broadcasts a text message to every active member except one — typically the sender, * so a client does not receive its own message echoed back. * * @param exclude the session to skip, or {@code null} to broadcast to everyone * @param text the text to send * @return this group, for fluent chaining */ public WebSocketGroup broadcastExcept(WebSocketSession exclude, String text) { var excludeCh = exclude == null ? null : exclude.channel(); for (var ch : channels) { if (ch.isActive() && ch != excludeCh) { ch.writeAndFlush(new TextWebSocketFrame(text)); } } return this; } /** * Closes every connection in the group. * * @return this group, for fluent chaining */ public WebSocketGroup closeAll() { channels.close(); return this; } }