Netty只怎么写同步发送逻辑
看了seata的TCC部分源码,看到TC调用每个分支事务的提交逻辑,看到了如何利用netty进行同步调用的写法,感觉挺好
AbstractNettyRemoting # sendSync
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { if (timeoutMillis <= 0) { throw new FrameworkException("timeout should more than 0ms"); } if (channel == null) { LOGGER.warn("sendSync nothing, caused by null channel."); return null; } MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception exx) {
MessageFuture 是seata 实现的future类,是对CompletableFuture的封装
public class MessageFuture { private RpcMessage requestMessage; private long timeout; private long start = System.currentTimeMillis(); private transient CompletableFuture
重点就是这个setResultMessage 啥时候被调用的了
AbstractNettyRemotingServer # ServerHandler
class ServerHandler extends ChannelDuplexHandler { /** * Channel read. * * @param ctx the ctx * @param msg the msg * @throws Exception the exception */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); }
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; final Pairpair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage);
ServerOnResponseProcessor # process
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(rpcMessage.getBody());
原理就是在服务端发送前,构造Future,然后将Future缓存在map,key是发送报文的id号。同时Netty的服务端注册handler时,实现channelRead中,把从客户端的报文中的id好再取出来,这样就能取出来future,根据报文的结果调用
CompletableFuture.complete 这样在同步发送的逻辑里
try { return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception exx) {
就可以从阻塞中返回了
原文:https://www.cnblogs.com/juniorMa/p/15200807.html