企业宣传,产品推广,广告招商,广告投放联系seowdb

单向调用 异步 间接上代码! 自己手写RPC如何成功同步

很多好用的RPC框架都允许服务消费者以同步、异步和单向调用的模式与服务提供者启动交互,冰河你开发的这个RPC框架也可以吗?

在前面的章节中,成功了服务消费者屏蔽掉基于Netty衔接服务提供者的成功细节的前提下,以异步转同步的模式调用服务提供者。在外部服务调用服务消费者向服务提供者发送数据的方法时,能够间接失掉到服务提供者调用实在方法前往的结果数据。

那RPC框架只允许同步伐用的话,在高并发环境下必需会产生性能疑问,我想让RPC框架允许同步、异步和单向调用,这也是很多低劣的RPC框架都允许的配置,这个有方法成功吗?

我:布置。。。

在服务提供者一端成功了依照自定义网络传输协定和数据编解码对接纳到的数据启动解析,并且能够将解析到的数据作为参数调用实在方法,并接纳实在方法前往的结果数据,经过自定义网络协定和数据编解码,将数据编码成二进制字节流,传输给服务消费者。

在服务消费者一端成功了依照自定义的网络传输协定和数据编解码,将数据编码成二进制字节流发送给服务提供者,能够接纳到服务提供者照应回来的二进制字节流数据,并且能够依据自定义网络传输协定和数据编解码,将接纳到的二进制字节流数据解码成对应的明文数据,接上去,进后退一步处置。

同时,服务消费者允许在屏蔽掉基于Netty衔接服务提供者的成功细节的前提下,使得外部服务调用服务消费者向服务提供者发送数据的方法时,能够间接失掉到服务提供者调用实在方法前往的结果数据。

做到这里,曾经初步成功了RPC框架最基本的配置。这还远远不够,服务消费者除了能够以同步的模式调用服务提供者,也要允许异步伐用和单向调用,看看人家Dubbo,做的是真特么牛逼。

好了,不艳羡人家,咱们自己踏虚浮实手撸吧,接上去,咱们就成功服务消费者以同步、异步、单向调用的模式与服务提供者启动交互。

服务消费者与服务提供者之间基于同步、异步和单向调用的设计图区分如下图所示

经过上图可以看出:

(1)同步伐用的模式,服务消费者动员数据恳求后,会同步期待前往结果。

(2)异步伐用的模式,服务消费者动员数据恳求后,会立刻前往,后续会经过异步的模式失掉数据。

(3)单向调用的模式,服务消费者动员数据恳求后,会立刻前往,不用关注后续数据的处置结果。

可以看到,从设计上还是比拟繁难的,接上去,咱们就一同成功它。

服务消费者与服务提供者之间基于同步、异步和单向调用的成功类相关如下图所示。

可以看到,**类之间的成功相关还是比拟明晰的。

RpcContext类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.context.RpcContext,源码如下所示。

public class RpcContext {private RpcContext(){}/*** RpcContext实例*/private static final RpcContext AGENT = new RpcContext();/*** 寄存RPCFuture的InheritableThreadLocal*/private static final InheritableThreadLocal<RPCFuture> RPC_FUTURE_INHERITABLE_THREAD_LOCAL = new InheritableThreadLocal<>();/*** 失掉高低文* @return RPC服务的高低文信息*/public static RpcContext getContext(){return AGENT;}/*** 将RPCFuture保留到线程的高低文* @param rpcFuture*/public void setRPCFuture(RPCFuture rpcFuture){RPC_FUTURE_INHERITABLE_THREAD_LOCAL.set(rpcFuture);}/*** 失掉RPCFuture*/public RPCFuture getRPCFuture(){return RPC_FUTURE_INHERITABLE_THREAD_LOCAL.get();}/*** 移除RPCFuture*/public void removeRPCFuture(){RPC_FUTURE_INHERITABLE_THREAD_LOCAL.remove();}}

可以看到,在RpcContext类中重要是经过InheritableThreadLocal在保养RPCFuture,并且每个线程保养RPCFuture时,都是相互隔离的。RpcContext类中保养的RPCFuture会在RPC框架全局有效。

RpcConsumerHandler类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.handler.RpcConsumerHandler,详细修正步骤如下所示。

(1)新增sendRequestSync()方法

sendRequestSync()方法示意同步伐用的方法,源码如下所示。

private RPCFuture sendRequestSync(RpcProtocol<RpcRequest> protocol) {RPCFuture rpcFuture = this.getRpcFuture(protocol);channel.writeAndFlush(protocol);return rpcFuture;}

可以看到,在sendRequestSync()方法中,调用channel的writeAndFlush()方法发送数据后,会前往RPCFuture对象。

(2)新增sendRequestAsync()方法

sendRequestAsync()方法示意异步伐用的方法,源码如下所示。

private RPCFuture sendRequestAsync(RpcProtocol<RpcRequest> protocol) {RPCFuture rpcFuture = this.getRpcFuture(protocol);//假设是异步伐用,则将RPCFuture放入RpcContextRpcContext.getContext().setRPCFuture(rpcFuture);channel.writeAndFlush(protocol);return null;}

可以看到,sendRequestAsync()方法中,会将RPCFuture对象放入RpcContext高低文中,最终前往null。外部服务调用服务消费者向服务提供者发送数据后,会经过RpcContext失掉到RPCFuture对象,进而经过RPCFuture对象失掉最终结果数据。

(3)新增sendRequestOneway()方法

sendRequestOneway()方法示意单向调用的方法,源码如下所示。

private RPCFuture sendRequestOneway(RpcProtocol<RpcRequest> protocol) {channel.writeAndFlush(protocol);return null;}

可以看到,单向调用方法并不关心前往结果。sendRequestOneway()方法间接调用channel的writeAndFlush()方法,并前往null。

(4)修正sendRequest()方法

在sendRequest()方法的参数中新增能否是异步伐用的async参数和能否是单向调用的oneway参数,以这些参数来判别是口头同步伐用、异步伐用还是单向调用,源码如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol, boolean async, boolean oneway){logger.info("服务消费者发送的数据===>>>{}", JSONObject.toJSONString(protocol));return oneway ? this.sendRequestOneway(protocol) : async ?sendRequestAsync(protocol) : this.sendRequestSync(protocol);}

RpcConsumer类位于bhrpc-consumer-common工程下的io.binghe.rpc.consumer.common.RpcConsumer,重要是修正RpcConsumer类中的sendRequest()方法,调用RpcConsumerHandler处置器类的sendRequest()方法时,须要传递能否是异步伐用async的标识和能否是单向调用oneway的标识,源码如下所示。

public RPCFuture sendRequest(RpcProtocol<RpcRequest> protocol) throws Exception {//################省略其余代码################RpcRequest request = protocol.getBody();return handler.sendRequest(protocol, request.getAsync(), request.getOneway());}

至此,整个成功就终了了。成功起来是不是很繁难呢?

整个测试环节不须要修正服务提供者的代码,所以,先启动服务提供者,启动bhrpc-test-provider工程下的io.binghe.rpc.test.provider.single.RpcSingleServerTest,输入的结果信息如下所示。

INFO BaseServer:82 - Server started on 127.0.0.1:27880

可以看到,服务提供者启动成功。

(1)修正同步伐用的main()方法

修正bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();RPCFuture future = consumer.sendRequest(getRpcRequestProtocol());LOGGER.info("从服务消费者失掉到的数据===>>>" + future.get());consumer.close();}

可以看到,同步伐用时,会间接回去方法调用的结果数据。

(2)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输入的结果信息如下所示。

13:45:12,576INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:45:12,693INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:45:12,868INFO RpcConsumerHandler:77 - 服务消费者接纳到的数据===>>>{"body":{"async":false,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}13:45:12,869INFO RpcConsumerHandlerTest:38 - 从服务消费者失掉到的数据===>>>hello binghe

可以看到,在服务消费者输入的信息中,除了向服务提供者发送的数据与接纳服务提供者照应的数据外,还在RpcConsumerHandlerTest类的main()方法中打印出了经过自定义的RPCFuture对象失掉的最终结果数据为hello binghe。合乎预期的成果。

(3)再次检查服务提供者日志

再次检查服务提供者输入的日志信息,如下所示。

13:45:12,748INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:45:12,748INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者经常使用CGLib的模式调用了实在的方法。

(1)修正同步伐用的main()方法

修正bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();consumer.sendRequest(getRpcRequestProtocol());RPCFuture future = RpcContext.getContext().getRPCFuture();LOGGER.info("从服务消费者失掉到的数据===>>>" + future.get());consumer.close();}

可以看到,口头异步伐用时,并没有从调用consumer的sendRequest()方法间接失掉前往的RPCFuture结果数据,而是经过RpcContext高低文失掉到RPCFuture对象,再由RPCFuture对象失掉结果数据。

(2)修正构建RpcProtocol对象的方法

修正getRpcRequestProtocol()方法中构建RpcRequest的方法参数,将能否是异步伐用的参数设置为true,源码如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){//模拟发送数据RpcProtocol<RpcRequest> protocol = new RpcProtocol<RpcRequest>();//################省略其余代码##########################request.setAsync(true);request.setOneway(false);protocol.setBody(request);return protocol;}

(3)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输入的结果信息如下所示

13:47:55,800INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:47:55,905INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":true,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":false,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:47:55,971INFO RpcConsumerHandler:77 - 服务消费者接纳到的数据===>>>{"body":{"async":true,"oneway":false,"result":"hello binghe"},"header":{"magic":16,"msgLen":211,"msgType":2,"requestId":1,"serializationType":"jdk","status":0}}13:47:55,971INFO RpcConsumerHandlerTest:40 - 从服务消费者失掉到的数据===>>>hello binghe

可以看到,在服务消费者输入的信息中,除了向服务提供者发送的数据与接纳服务提供者照应的数据外,还在RpcConsumerHandlerTest类的main()方法中打印出了经过自定义的RPCFuture对象失掉的最终结果数据为hello binghe。合乎预期的成果。

(4)再次检查服务提供者日志

再次检查服务提供者输入的日志信息,如下所示。

13:47:55,948INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:47:55,948INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者经常使用CGLib的模式调用了实在的方法。

(1)修正同步伐用的main()方法

修正bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,源码如下所示。

public static void main(String[] args) throws Exception {RpcConsumer consumer = RpcConsumer.getInstance();consumer.sendRequest(getRpcRequestProtocol());LOGGER.info("无需失掉前往的结果数据");consumer.close();}

可以看到,在单向调用中,并没有失掉前往结果。

(2)修正构建RpcProtocol对象的方法

修正getRpcRequestProtocol()方法中构建RpcRequest的方法参数,将能否是单向调用的参数设置为true,源码如下所示。

private static RpcProtocol<RpcRequest> getRpcRequestProtocol(){//模拟发送数据 //#############省略其余代码#################request.setAsync(false);request.setOneway(true);protocol.setBody(request);return protocol;}

(3)启动服务消费者

启动bhrpc-test-consumer-handler工程下的io.binghe.rpc.test.consumer.handler.RpcConsumerHandlerTest类的main()方法,输入的结果信息如下所示。

13:58:26,417INFO RpcConsumer:99 - connect rpc server 127.0.0.1 on port 27880 success.13:58:26,524INFO RpcConsumerHandler:90 - 服务消费者发送的数据===>>>{"body":{"async":false,"className":"io.binghe.rpc.test.api.DemoService","group":"binghe","methodName":"hello","oneway":true,"parameterTypes":["java.lang.String"],"parameters":["binghe"],"version":"1.0.0"},"header":{"magic":16,"msgLen":0,"msgType":1,"requestId":1,"serializationType":"jdk","status":1}}13:58:26,531INFO RpcConsumerHandlerTest:39 - 无需失掉前往的结果数据

可以看到,服务消费者向服务提供者发送数据后,并没有失掉前往的结果数据。

(4)再次检查服务提供者日志

再次检查服务提供者输入的日志信息,如下所示。

13:58:26,565INFO RpcProviderHandler:132 - use cglib reflect type invoke method...13:58:26,566INFO ProviderDemoServiceImpl:33 - 调用hello方法传入的参数为===>>>binghe

可以看到,服务提供者经常使用CGLib的模式调用了实在的方法。

目前成功的RPC框架以Java原生进程的模式启动后,能够成功服务消费者以同步、异步和单向调用的模式与服务提供者之间启动数据交互。至此,咱们写的RPC框架的配置又进一步失掉了增强。

咱们写的RPC框架正在一步步成功它该有的配置。

© 版权声明
评论 抢沙发
加载中~
每日一言
不怕万人阻挡,只怕自己投降
Not afraid of people blocking, I'm afraid their surrender