这次咱们看的是客户端局部。
1:在客户端咱们应用的是注解 @GlobalTransactional。会创立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行TM和RM的初始化,代码如下:
private void initClient() { if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); registerSpringShutdownHook(); }
2:在 TMClient 或者 RMClient 的init 办法里,会创立 NettyClientBootstrap 实例。在 NettyClientBootstrap 结构过程中,会创立 Bootstrap 实例,也会创立 NioEventLoopGroup 的客户端事件选择器。代码如下:
public class NettyClientBootstrap implements RemotingBootstrap { private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class); private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private EventExecutorGroup defaultEventExecutorGroup; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) { if (nettyClientConfig == null) { nettyClientConfig = new NettyClientConfig(); } this.nettyClientConfig = ne<i style="color:transparent">来源gaodai$ma#com搞$代*码网</i>ttyClientConfig; int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize(); this.transactionRole = transactionRole; this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize)); this.defaultEventExecutorGroup = eventExecutorGroup; }
3:创立之后,会调用 NettyClientBootstrap 的 start 办法,建设netty的客户端代码,如下:
public void start() { this.bootstrap.group(this.eventLoopGroupWorker).channel( //绑定事件选择器 nettyClientConfig.getClientChannelClazz()).option( //设置通道类型,默认是NioSocketChannel ChannelOption.TCP_NODELAY, true) // TCP不缓存间接发送 .option(ChannelOption.SO_KEEPALIVE, true) // TCP进行心跳检测 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置连贯超时工夫 .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //设置发送缓存区大小 .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //设置承受缓冲区大小 bootstrap.handler(new ChannelInitializer<SocketChannel>() { //设置通道处理器 @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //增加通道闲暇心跳处理器 nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) .addLast(new ProtocolV1Decoder()) //通道音讯解码处理器 .addLast(new ProtocolV1Encoder()); //通道音讯编码处理器 if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); //增加处理器 ClientHandler } } }); if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) { LOGGER.info("NettyClientBootstrap has started"); } }
4:在seata客户端,应用netty客户端的时候,应用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 办法中去获取netty的连贯通道。获取通道的代码如下:
public Channel getNewChannel(InetSocketAddress address) { Channel channel; ChannelFuture f = this.bootstrap.connect(address); // 连贯netty服务器 try { f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 期待连贯实现 if (f.isCancelled()) { throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server."); } else if (!f.isSuccess()) { throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server."); } else { channel = f.channel(); //获取通道 } } catch (Exception e) { throw new FrameworkException(e, "can not connect to services-server."); } return channel; }
5:发送音讯的示例代码(这是须要获取返回值的状况,如果不须要获取返回值,间接调用 channel.writeAndFlush()即可):
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); String remoteAddr = ChannelUtil.getAddressFromChannel(channel); doBeforeRpcHooks(remoteAddr, rpcMessage); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); doAfterRpcHooks(remoteAddr, rpcMessage, result); return result; } catch (Exception exx) { if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } }