Dubbo泛化调用
简介
泛化接口调用方式主要用于客户端没有 API 接口及模型类元的情况,参数及返回值中的所有 POJO 均用 Map表示。使用泛化调用时,服务提供者应用没有特殊操作,服务消费者应用不再需要引入服务提供者的SDK二方包。
适用于API网关服务、框架集成等场景,提供一个Dubbo服务的统一管理平台,让各个消费方调用在统一管理平台注册了的服务,而不需要引入SDK二方库。
用法示例
public interface GreetingService { String sayHello(String name); } /** * 通过API方式使用泛化调用 */ @Test public void test() { // 应用配置 ApplicationConfig applicationConfig = new ApplicationConfig(); applicationConfig.setName("dubbo-consumer"); // 注册中心配置 RegistryConfig registryConfig = new RegistryConfig(); registryConfig.setAddress("zookeeper://127.0.0.1:2181"); // 引用远程服务 ReferenceConfig<GenericService> reference = new ReferenceConfig<>(); reference.setApplication(applicationConfig); reference.setRegistry(registryConfig); reference.setInterface("com.xxx.GreetingService"); reference.setGeneric(true); // 获取GenericService,代替 ReferenceConfigCache cache = ReferenceConfigCache.getCache(); GenericService genericService = cache.get(reference); // 调用服务 String[] parameterTypes = new String[] { "java.lang.String" }; Object[] args = Stream.of("xiaoming").toArray(); Object result = genericService.$invoke("sayHello", parameterTypes, args); System.out.println("result = " + result); }复制代码
源码分析
服务消费方
在dubbo的责任链中,GenericImplFilter会拦截泛化调用,进行参数校验,并发起RPC调用。
org.apache.dubbo.rpc.filter.GenericImplFilter#invoke
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY); // 判断是否是泛化调用 if (invocation.getMethodName().equals(Constants.$INVOKE) && invocation.getArguments() != null && invocation.getArguments().length == 3 && ProtocolUtils.isGeneric(generic)) { // 调用参数 Object[] args = (Object[]) invocation.getArguments()[2]; if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (Object arg : args) { if (!(byte[].class == arg.getClass())) { error(generic, byte[].class.getName(), arg.getClass().getName()); } } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (Object arg : args) { if (!(arg instanceof JavaBeanDescriptor)) { error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName()); } } } // 传递泛化调用方式参数,以便于服务提供方解析请求参数等数据 ((RpcInvocation) invocation).setAttachment( Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY)); } // 发起RPC调用 return invoker.invoke(invocation); }复制代码
服务提供方
服务提供方使用GenericFilter拦截请求,把泛化参数进行反序列化解析,将请求转发给具体的服务提供实现类对象进行业务执行。
org.apache.dubbo.rpc.filter.GenericFilter#invoke
@Override public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException { // 泛化调用 if (inv.getMethodName().equals(Constants.$INVOKE) && inv.getArguments() != null && inv.getArguments().length == 3 && !GenericService.class.isAssignableFrom(invoker.getInterface())) { // 参数信息 String name = ((String) inv.getArguments()[0]).trim(); String[] types = (String[]) inv.getArguments()[1]; Object[] args = (Object[]) inv.getArguments()[2]; try { // 获取调用方法 Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types); Class<?>[] params = method.getParameterTypes(); if (args == null) { args = new Object[params.length]; } String generic = inv.getAttachment(Constants.GENERIC_KEY); if (StringUtils.isBlank(generic)) { generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY); } // 泛化类型为空,使用默认反序列化方式 if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) { args = PojoUtils.realize(args, params, method.getGenericParameterTypes()); // nativejava反序列化方式 } else if (ProtocolUtils.isJavaGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (byte[].class == args[i].getClass()) { try(UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) { args[i] = ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA) .deserialize(null, is).readObject(); } catch (Exception e) { throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e); } } else { throw new RpcException(...); } } // bean反序列化方式 } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { for (int i = 0; i < args.length; i++) { if (args[i] instanceof JavaBeanDescriptor) { args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]); } else { throw new RpcException(...)); } } } // 执行具体服务 Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments())); if (result.hasException() && !(result.getException() instanceof GenericException)) { return new RpcResult(new GenericException(result.getException())); } // 对结果序列化处理并返回 if (ProtocolUtils.isJavaGenericSerialization(generic)) { try { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512); ExtensionLoader.getExtensionLoader(Serialization.class) .getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA) .serialize(null, os).writeObject(result.getValue()); return new RpcResult(os.toByteArray()); } catch (IOException e) { throw new RpcException("Serialize result failed.", e); } } else if (ProtocolUtils.isBeanGenericSerialization(generic)) { return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD)); } else { return new RpcResult(PojoUtils.generalize(result.getValue())); } } catch (NoSuchMethodException e) { throw new RpcException(e.getMessage(), e); } catch (ClassNotFoundException e) { throw new RpcException(e.getMessage(), e); } } // 非泛化调用,把请求传递给下一个过滤器 return invoker.invoke(inv); }
作者:逍遥道长
链接:https://juejin.cn/post/7021527389880451085