package dev.coph.nextusweb.server.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; final class WebSocketFrameHandler extends SimpleChannelInboundHandler { private static final Executor VT_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor(); private final WebSocketHandler handler; private final String path; private final Map pathParams; WebSocketFrameHandler(WebSocketHandler handler, String path, Map pathParams) { this.handler = handler; this.path = path; this.pathParams = pathParams; } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketSession session = new WebSocketSession(ctx.channel(), path, pathParams); ctx.channel().attr(WebSocketSession.SESSION_KEY).set(session); VT_EXECUTOR.execute(() -> { try { handler.onOpen(session); } catch (Throwable t) { safeError(session, t); } }); return; } if (evt instanceof IdleStateEvent) { ctx.close(); return; } super.userEventTriggered(ctx, evt); } @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) { WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).get(); if (session == null) return; if (frame instanceof TextWebSocketFrame text) { String content = text.text(); VT_EXECUTOR.execute(() -> { try { handler.onMessage(session, content); } catch (Throwable t) { safeError(session, t); } }); } else if (frame instanceof BinaryWebSocketFrame bin) { int readable = bin.content().readableBytes(); byte[] data = new byte[readable]; bin.content().getBytes(bin.content().readerIndex(), data); VT_EXECUTOR.execute(() -> { try { handler.onBinary(session, data); } catch (Throwable t) { safeError(session, t); } }); } else if (frame instanceof CloseWebSocketFrame close) { int code = close.statusCode(); String reason = close.reasonText() == null ? "" : close.reasonText(); VT_EXECUTOR.execute(() -> { try { handler.onClose(session, code, reason); } catch (Throwable t) { safeError(session, t); } }); } } @Override public void channelInactive(ChannelHandlerContext ctx) { WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).getAndSet(null); if (session == null) return; VT_EXECUTOR.execute(() -> { try { handler.onClose(session, 1006, "Connection closed"); } catch (Throwable t) { safeError(session, t); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { WebSocketSession session = ctx.channel().attr(WebSocketSession.SESSION_KEY).get(); if (session != null) safeError(session, cause); ctx.close(); } private void safeError(WebSocketSession session, Throwable cause) { try { handler.onError(session, cause); } catch (Throwable ignored) { } } }