WebFlux之Websocket
标签:return inf load data require remote bin sas pool har
目录
协议
参考 https://www.cnblogs.com/nuccch/p/10947256.html
WebSocket是基于TCP的应用层协议,用于在C/S架构的应用中实现双向通信,关于WebSocket协议的详细规范和定义参见rfc6455。
需要特别注意的是:虽然WebSocket协议在建立连接时会使用HTTP协议,但这并意味着WebSocket协议是基于HTTP协议实现的
WebSocket与Http的区别
websocket应用层的连接协议有两种
Handshake
Message
连接(Handshake)
标准的websocket连接请求头需要包含
GET /xxx HTTP/1.1 # 主机。 Host: server.example.com # 协议升级。 Upgrade: websocket # 连接状态。 Connection: Upgrade # websocket客户端生成的随机字符。 Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== # websocket协议的版本是13。 Sec-WebSocket-Version: 13
数据帧(Message)
WebSocket协议格式:
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+
webflux集成
继承WebSocketHandler
EchoWebSocketHandler继承WebSocketHandler
public class EchoWebSocketHandler implements WebSocketHandler { public EchoWebSocketHandler() { } @Override public Mono<Void> handle(WebSocketSession session) { // Use retain() for Reactor Netty return session.send(session.receive().doOnNext(WebSocketMessage::retain)); } }
注册HandlerMapping
注册了 /echo 到 EchoWebSocketHandler 的映射
@Bean public HandlerMapping handlerMapping() { Map<String, WebSocketHandler> map = new HashMap<>(); map.put("/echo", new EchoWebSocketHandler()); SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); mapping.setUrlMap(map); return mapping; }
注入WebSocketHandlerAdapter
@Bean WebSocketHandlerAdapter webSocketHandlerAdapter(){ return new WebSocketHandlerAdapter(); }
原理解析
1.DispatcherHandler
注入了SimpleUrlHandlerMapping和WebSocketHandlerAdapter,看下如何处理来自浏览器的请求
public Mono<Void> handle(ServerWebExchange exchange) { if (this.handlerMappings == null) { return createNotFoundError(); } return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); }
mapping.getHandler(exchange)返回EchoWebSocketHandler
invokeHandler调用WebSocketHandlerAdapter处理EchoWebSocketHandler
2.WebSocketHandlerAdapter
supports方法判断EchoWebSocketHandler的返回结果为true
handle方法调用WebSocketService的handleRequest方法
@Override public boolean supports(Object handler) { return WebSocketHandler.class.isAssignableFrom(handler.getClass()); } @Override public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) { WebSocketHandler webSocketHandler = (WebSocketHandler) handler; return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty()); }
3.WebSocketService
实现类为HandshakeWebSocketService
验证HTTP请求头
GET 方法
Upgrade: websocket
Connection: Upgrade
Sec-Websocket-Key: S7SmUANSlEG47sjuY9C2sg==
Sec-Websocket-Protocol: (可选)
RequestUpgradeStrategy处理websocket请求升级,即将HTTP协议转为Websocket协议,后面的数据都通过WebSocket数据帧进行通信
public Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler) { ServerHttpRequest request = exchange.getRequest(); HttpMethod method = request.getMethod(); HttpHeaders headers = request.getHeaders(); if (HttpMethod.GET != method) { return Mono.error(new MethodNotAllowedException( request.getMethodValue(), Collections.singleton(HttpMethod.GET))); } if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) { return handleBadRequest(exchange, "Invalid ‘Upgrade‘ header: " + headers); } List<String> connectionValue = headers.getConnection(); if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) { return handleBadRequest(exchange, "Invalid ‘Connection‘ header: " + headers); } String key = headers.getFirst(SEC_WEBSOCKET_KEY); if (key == null) { return handleBadRequest(exchange, "Missing \"Sec-WebSocket-Key\" header"); } String protocol = selectProtocol(headers, handler); return initAttributes(exchange).flatMap(attributes -> this.upgradeStrategy.upgrade(exchange, handler, protocol, () -> createHandshakeInfo(exchange, request, protocol, attributes)) ); }
4.RequestUpgradeStrategy
Reactor Netty的实现类ReactorNettyRequestUpgradeStrategy
调用HttpServerOperations.sendWebsocket进一步处理
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier<HandshakeInfo> handshakeInfoFactory) { ServerHttpResponse response = exchange.getResponse(); HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse(); HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, (in, out) -> { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession( in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength); return handler.handle(session); }); }
握手信息HandshakeInfo
private HandshakeInfo createHandshakeInfo(ServerWebExchange exchange, ServerHttpRequest request, @Nullable String protocol, Map<String, Object> attributes) { URI uri = request.getURI(); // Copy request headers, as they might be pooled and recycled by // the server implementation once the handshake HTTP exchange is done. HttpHeaders headers = new HttpHeaders(); headers.addAll(request.getHeaders()); Mono<Principal> principal = exchange.getPrincipal(); String logPrefix = exchange.getLogPrefix(); InetSocketAddress remoteAddress = request.getRemoteAddress(); return new HandshakeInfo(uri, headers, principal, protocol, remoteAddress, attributes, logPrefix); }
5.HttpServerOperations
WebSocketServerOperations负责建立握手协议
rebind 设置channel的ReactorNetty.CONNECTION属性为WebsocketServerOperations
成功后WebsocketHandler绑定WebsocketServerOperations
public Mono<Void> sendWebsocket(@Nullable String protocols, int maxFramePayloadLength, BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) { return this.withWebsocketSupport(this.uri(), protocols, maxFramePayloadLength, websocketHandler); } final Mono<Void> withWebsocketSupport(String url, @Nullable String protocols, int maxFramePayloadLength, BiFunction<? super WebsocketInbound, ? super WebsocketOutbound, ? extends Publisher<Void>> websocketHandler) { Objects.requireNonNull(websocketHandler, "websocketHandler"); if (this.markSentHeaders()) { WebsocketServerOperations ops = new WebsocketServerOperations(url, protocols, maxFramePayloadLength, this); if (this.rebind(ops)) { return FutureMono.from(ops.handshakerResult).doOnEach((signal) -> { if (!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) { ((Publisher)websocketHandler.apply(ops, ops)).subscribe(new HttpServerOperations.WebsocketSubscriber(ops, signal.getContext())); } }); } } else { log.error(ReactorNetty.format(this.channel(), "Cannot enable websocket if headers have already been sent")); } return Mono.error(new IllegalStateException("Failed to upgrade to websocket")); }
握手协议的建立
创建WebSocketServerHandshakerFactory
创建WebSocketServerHandshaker = WebSocketServerHandshakerFactory.newHandshaker
这里会根据sec-websocket-protocol加载相应的协议
public WebSocketServerHandshaker newHandshaker(HttpRequest req) { CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION); if (version != null) { if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) { return new WebSocketServerHandshaker13(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch); } else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) { return new WebSocketServerHandshaker08(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch); } else { return version.equals(WebSocketVersion.V07.toHttpHeaderValue()) ? new WebSocketServerHandshaker07(this.webSocketURL, this.subprotocols, this.allowExtensions, this.maxFramePayloadLength, this.allowMaskMismatch) : null; } } else { return new WebSocketServerHandshaker00(this.webSocketURL, this.subprotocols, this.maxFramePayloadLength); } }
移除reactor.left.httpTrafficHandler
WebSocketServerHandshaker.handshake实现握手
WebsocketServerOperations(String wsUrl, @Nullable String protocols, int maxFramePayloadLength, HttpServerOperations replaced) { super(replaced); Channel channel = replaced.channel(); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(wsUrl, protocols, true, maxFramePayloadLength); this.handshaker = wsFactory.newHandshaker(replaced.nettyRequest); if (this.handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(channel); this.handshakerResult = null; } else { this.removeHandler("reactor.left.httpTrafficHandler"); this.handshakerResult = channel.newPromise(); HttpRequest request = new DefaultFullHttpRequest(replaced.version(), replaced.method(), replaced.uri()); request.headers().set(replaced.nettyRequest.headers()); if (this.channel().pipeline().get("reactor.left.compressionHandler") != null) { this.removeHandler("reactor.left.compressionHandler"); WebSocketServerCompressionHandler wsServerCompressionHandler = new WebSocketServerCompressionHandler(); try { wsServerCompressionHandler.channelRead(channel.pipeline().context("reactor.right.reactiveBridge"), request); this.addHandlerFirst("reactor.left.wsCompressionHandler", wsServerCompressionHandler); } catch (Throwable var10) { log.error(ReactorNetty.format(this.channel(), ""), var10); } } this.handshaker.handshake(channel, request, replaced.responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING), this.handshakerResult).addListener((f) -> { this.markPersistent(false); }); } }
6.WebSocketServerHandshaker
实现类有WebSocketServerHandshaker13、WebSocketServerHandshaker08、WebSocketServerHandshaker07、WebSocketServerHandshaker00
新建HandshakeResponse
主要设置了三个Header
移除HttpObjectAggregator pipeline
移除HttpContentCompressor pipeline
在HttpServerCodec pipeline之前添加WebsocketDecoder和WebsocketEncoder
写HandshakeResponse到输出流
public final ChannelFuture handshake(Channel channel, FullHttpRequest req, HttpHeaders responseHeaders, final ChannelPromise promise) { if (logger.isDebugEnabled()) { logger.debug("{} WebSocket version {} server handshake", channel, this.version()); } FullHttpResponse response = this.newHandshakeResponse(req, responseHeaders); ChannelPipeline p = channel.pipeline(); if (p.get(HttpObjectAggregator.class) != null) { p.remove(HttpObjectAggregator.class); } if (p.get(HttpContentCompressor.class) != null) { p.remove(HttpContentCompressor.class); } ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class); final String encoderName; if (ctx == null) { ctx = p.context(HttpServerCodec.class); if (ctx == null) { promise.setFailure(new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline")); return promise; } p.addBefore(ctx.name(), "wsdecoder", this.newWebsocketDecoder()); p.addBefore(ctx.name(), "wsencoder", this.newWebSocketEncoder()); encoderName = ctx.name(); } else { p.replace(ctx.name(), "wsdecoder", this.newWebsocketDecoder()); encoderName = p.context(HttpResponseEncoder.class).name(); p.addBefore(encoderName, "wsencoder", this.newWebSocketEncoder()); } channel.writeAndFlush(response).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { ChannelPipeline p = future.channel().pipeline(); p.remove(encoderName); promise.setSuccess(); } else { promise.setFailure(future.cause()); } } }); return promise; }
握手前后的Netty Pipeline对比如下
注:WebSocketFrameAggregator是在org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive方法中添加的
@Override public Flux<WebSocketMessage> receive() { return getDelegate().getInbound() .aggregateFrames(this.maxFramePayloadLength) .receiveFrames() .map(super::toMessage) .doOnNext(message -> { if (logger.isTraceEnabled()) { logger.trace(getLogPrefix() + "Received " + message); } }); } default WebsocketInbound aggregateFrames(int maxContentLength) { this.withConnection((c) -> { c.addHandlerLast(new WebSocketFrameAggregator(maxContentLength)); }); return this; }
7.WebsocketHandler绑定
reactor.netty.http.server.HttpServerOperations#withWebsocketSupport
FutureMono.from(ops.handshakerResult) .doOnEach(signal -> { if(!signal.hasError() && (protocols == null || ops.selectedSubprotocol() != null)) { websocketHandler.apply(ops, ops) .subscribe(new WebsocketSubscriber(ops, signal.getContext())); } });
websocketHandler.apply实际调用的是org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy#upgrade
(in, out) -> { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession( in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength); return handler.handle(session); })
handler的实际类是EchoWebSocketHandler
所以最终调用了
public Mono<Void> handle(WebSocketSession session) { // Use retain() for Reactor Netty return session.send(session.receive().doOnNext(WebSocketMessage::retain)); }
8.websocket解码
核心解码在WebSocket08FrameDecoder类
再来看下WebSocket协议格式:
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+
READING_FIRST 对应第一个字节的解析
READING_SECOND 对应第二个字节的解析
READING_SIZE 读取data payload长度
MASKING_KEY masking-key 4个字节(需要READING_SECOND 的mask位为1)
PAYLOAD 按长度去读payload到buffer
封装成TextWebSocketFrame,添加到out数组
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // Discard all data received if closing handshake was received before. if (receivedClosingHandshake) { in.skipBytes(actualReadableBytes()); return; } switch (state) { case READING_FIRST: if (!in.isReadable()) { return; } framePayloadLength = 0; // FIN, RSV, OPCODE byte b = in.readByte(); frameFinalFlag = (b & 0x80) != 0; frameRsv = (b & 0x70) >> 4; frameOpcode = b & 0x0F; if (logger.isDebugEnabled()) { logger.debug("Decoding WebSocket Frame opCode={}", frameOpcode); } state = State.READING_SECOND; case READING_SECOND: if (!in.isReadable()) { return; } // MASK, PAYLOAD LEN 1 b = in.readByte(); frameMasked = (b & 0x80) != 0; framePayloadLen1 = b & 0x7F; if (frameRsv != 0 && !allowExtensions) { protocolViolation(ctx, "RSV != 0 and no extension negotiated, RSV:" + frameRsv); return; } if (!allowMaskMismatch && expectMaskedFrames != frameMasked) { protocolViolation(ctx, "received a frame that is not masked as expected"); return; } if (frameOpcode > 7) { // control frame (have MSB in opcode set) // control frames MUST NOT be fragmented if (!frameFinalFlag) { protocolViolation(ctx, "fragmented control frame"); return; } // control frames MUST have payload 125 octets or less if (framePayloadLen1 > 125) { protocolViolation(ctx, "control frame with payload length > 125 octets"); return; } // check for reserved control frame opcodes if (!(frameOpcode == OPCODE_CLOSE || frameOpcode == OPCODE_PING || frameOpcode == OPCODE_PONG)) { protocolViolation(ctx, "control frame using reserved opcode " + frameOpcode); return; } // close frame : if there is a body, the first two bytes of the // body MUST be a 2-byte unsigned integer (in network byte // order) representing a getStatus code if (frameOpcode == 8 && framePayloadLen1 == 1) { protocolViolation(ctx, "received close control frame with payload len 1"); return; } } else { // data frame // check for reserved data frame opcodes if (!(frameOpcode == OPCODE_CONT || frameOpcode == OPCODE_TEXT || frameOpcode == OPCODE_BINARY)) { protocolViolation(ctx, "data frame using reserved opcode " + frameOpcode); return; } // check opcode vs message fragmentation state 1/2 if (fragmentedFramesCount == 0 && frameOpcode == OPCODE_CONT) { protocolViolation(ctx, "received continuation data frame outside fragmented message"); return; } // check opcode vs message fragmentation state 2/2 if (fragmentedFramesCount != 0 && frameOpcode != OPCODE_CONT && frameOpcode != OPCODE_PING) { protocolViolation(ctx, "received non-continuation data frame while inside fragmented message"); return; } } state = State.READING_SIZE; case READING_SIZE: // Read frame payload length if (framePayloadLen1 == 126) { if (in.readableBytes() < 2) { return; } framePayloadLength = in.readUnsignedShort(); if (framePayloadLength < 126) { protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)"); return; } } else if (framePayloadLen1 == 127) { if (in.readableBytes() < 8) { return; } framePayloadLength = in.readLong(); // TODO: check if it‘s bigger than 0x7FFFFFFFFFFFFFFF, Maybe // just check if it‘s negative? if (framePayloadLength < 65536) { protocolViolation(ctx, "invalid data frame length (not using minimal length encoding)"); return; } } else { framePayloadLength = framePayloadLen1; } if (framePayloadLength > maxFramePayloadLength) { protocolViolation(ctx, "Max frame length of " + maxFramePayloadLength + " has been exceeded."); return; } if (logger.isDebugEnabled()) { logger.debug("Decoding WebSocket Frame length={}", framePayloadLength); } state = State.MASKING_KEY; case MASKING_KEY: if (frameMasked) { if (in.readableBytes() < 4) { return; } if (maskingKey == null) { maskingKey = new byte[4]; } in.readBytes(maskingKey); } state = State.PAYLOAD; case PAYLOAD: if (in.readableBytes() < framePayloadLength) { return; } ByteBuf payloadBuffer = null; try { payloadBuffer = readBytes(ctx.alloc(), in, toFrameLength(framePayloadLength)); // Now we have all the data, the next checkpoint must be the next // frame state = State.READING_FIRST; // Unmask data if needed if (frameMasked) { unmask(payloadBuffer); } // Processing ping/pong/close frames because they cannot be // fragmented if (frameOpcode == OPCODE_PING) { out.add(new PingWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } if (frameOpcode == OPCODE_PONG) { out.add(new PongWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } if (frameOpcode == OPCODE_CLOSE) { receivedClosingHandshake = true; checkCloseFrameBody(ctx, payloadBuffer); out.add(new CloseWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } // Processing for possible fragmented messages for text and binary // frames if (frameFinalFlag) { // Final frame of the sequence. Apparently ping frames are // allowed in the middle of a fragmented message if (frameOpcode != OPCODE_PING) { fragmentedFramesCount = 0; } } else { // Increment counter fragmentedFramesCount++; } // Return the frame if (frameOpcode == OPCODE_TEXT) { out.add(new TextWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else if (frameOpcode == OPCODE_BINARY) { out.add(new BinaryWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else if (frameOpcode == OPCODE_CONT) { out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, payloadBuffer)); payloadBuffer = null; return; } else { throw new UnsupportedOperationException("Cannot decode web socket frame with opcode: " + frameOpcode); } } finally { if (payloadBuffer != null) { payloadBuffer.release(); } } case CORRUPT: if (in.isReadable()) { // If we don‘t keep reading Netty will throw an exception saying // we can‘t return null if no bytes read and state not changed. in.readByte(); } return; default: throw new Error("Shouldn‘t reach here."); } }
9.数据帧处理
接上面
wsdecoder处理后数据封装为TextWebSocketFrame
WebSocketFrameAggregator遍历TextWebSocketFrame数据,逐个发送到下一个pipeline处理
reactiveBridge的handler为ChannelOperationsHandler,调用ReactorNetty.CONNECTION绑定的Connection进行处理
reactor.netty.channel.ChannelOperationsHandler#channelRead
ChannelOperations<?, ?> ops = ChannelOperations.get(ctx.channel()); if (ops != null) { ops.onInboundNext(ctx, msg); }
实际调用为FluxReceive.onInboundNext
现在分析下FluxReceive对象是如何生成处理链的
com.example.demo.EchoWebSocketHandler#handle
@Override public Mono<Void> handle(WebSocketSession session) { // Use retain() for Reactor Netty return session.send(session.receive().doOnNext(WebSocketMessage::retain)); }
org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#receive
public Flux<WebSocketMessage> receive() { return getDelegate().getInbound() .aggregateFrames(this.maxFramePayloadLength) .receiveFrames() // 1 FluxFilter 2 FluxMap .map(super::toMessage) //3 FluxMap .doOnNext(message -> { // 4 FluxPeek if (logger.isTraceEnabled()) { logger.trace(getLogPrefix() + "Received " + message); } }); }
com.example.demo.EchoWebSocketHandler#handle
session.receive().doOnNext(WebSocketMessage::retain) // doOnNext 5 FluxPeek
org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession#send
public Mono<Void> send(Publisher<WebSocketMessage> messages) { Flux<WebSocketFrame> frames = Flux.from(messages) .doOnNext(message -> { //6 FluxPeek if (logger.isTraceEnabled()) { logger.trace(getLogPrefix() + "Sending " + message); } }) .map(this::toFrame); //7 FluxMap return getDelegate().getOutbound() .options(NettyPipeline.SendOptions::flushOnEach) .sendObject(frames) //8 MonoSendMany .then(); }
reactor.netty.http.server.HttpServerOperations#withWebsocketSupport
websocketHandler.apply(ops, ops).subscribe(new WebsocketSubscriber(ops, signal.getContext()) //9 WebsocketSubscriber
标签:return inf load data require remote bin sas pool har
原文地址:https://www.cnblogs.com/huiyao/p/14454510.html