Dubbo 自适应SPI
Dubbo 自适应SPI
1. 原理
在 Dubbo 中,很多拓展都是通过 SPI 机制进行加载的,比如 Protocol、Cluster、LoadBalance 等。有时,有些拓展并不想在框架启动阶段被加载,而是希望在拓展方法被调用时,根据运行时参数进行加载。这听起来有些矛盾。拓展未被加载,那么拓展方法就无法被调用(静态方法除外)。拓展方法未被调用,拓展就无法被加载。对于这个矛盾的问题,Dubbo 通过自适应拓展机制很好的解决了。自适应拓展机制的实现逻辑比较复杂,首先 Dubbo 会为拓展接口生成具有代理功能的代码。然后通过 javassist 或 jdk 编译这段代码,得到 Class 类。最后再通过反射创建代理类。那么在调用接口的方法的时候,就是通过代理类来进行调用。这就是Dubbo 自适应SPI的原理。
注意:本次的源码分析是基于2.7.8.
2. 原理
Adaptive 接口
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
根据 value()方法的注释,可以简单的理解为:根据URL中的某个参数的值,来决定注入哪个拓展对象。而参数的名称就由value()这个方法来决定。 如果根据value()方法给定的参数名称,无法从URL中获取到值,那么就会使用默认拓展。
获得自适应拓展
getAdaptiveExtension 方法是获取自适应拓展的入口方法。
public T getAdaptiveExtension() {
// 从缓存中获取自适应拓展
Object instance = cachedAdaptiveInstance.get();
// 缓存未命中
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
// 根据双重检测锁创建自适应拓展,并且加载到缓存中
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应拓展
instance = createAdaptiveExtension();
// 设置自适应拓展到缓存中
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}ersion>
</dependency>
getAdaptiveExtension 方法首先会检查缓存,缓存未命中,则调用 createAdaptiveExtension 方法创建自适应拓展。下面,我们看一下 createAdaptiveExtension 方法的代码。
private T createAdaptiveExtension() {
try {
// 获取自适应拓展类,并通过反射实例化
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
createAdaptiveExtension 方法的代码比较少,但却包含了三个逻辑,分别如下:
调用 getAdaptiveExtensionClass 方法获取自适应拓展 Class 对象
通过反射进行实例化
调用 injectExtension 方法向拓展实例中注入依赖
前两个逻辑比较好理解,第三个逻辑用于向自适应拓展对象中注入依赖。这个逻辑看似多余,但有存在的必要,这里简单说明一下。前面说过,Dubbo 中有两种类型的自适应拓展,一种是手工编码的,一种是自动生成的。手工编码的自适应拓展中可能存在着一些依赖,而自动生成的 Adaptive 拓展则不会依赖其他类。这里调用 injectExtension 方法的目的是为手工编码的自适应拓展注入依赖,这一点需要大家注意一下。关于 injectExtension 方法,前文已经分析过了,这里不再赘述。接下来,分析 getAdaptiveExtensionClass 方法的逻辑。
private Class<?> getAdaptiveExtensionClass() {
// 通过 SPI 获取所有的拓展类
getExtensionClasses();
// 检查缓存,若缓存不为空,则直接返回缓存
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 创建自适应拓展类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
这三个逻辑看起来平淡无奇,似乎没有多讲的必要。但是这些平淡无奇的代码中隐藏了着一些细节,需要说明一下。首先从第一个逻辑说起,getExtensionClasses 这个方法用于获取某个接口的所有实现类。比如该方法可以获取 Protocol 接口的 DubboProtocol、HttpProtocol、InjvmProtocol 等实现类。在获取实现类的过程中,如果某个实现类被 Adaptive 注解修饰了,那么该类就会被赋值给 cachedAdaptiveClass 变量。此时,上面步骤中的第二步条件成立(缓存不为空),直接返回 cachedAdaptiveClass 即可。如果所有的实现类均未被 Adaptive 注解修饰,那么执行第三步逻辑,创建自适应拓展类。相关代码如下:
private Class<?> createAdaptiveExtensionClass() {
// 构建自适应拓展代码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
// 获取编译器实现类
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 编译代码,生成 Class
return compiler.compile(code, classLoader);
}
createAdaptiveExtensionClass 方法用于生成自适应拓展类,该方法首先会生成自适应拓展类的源码,然后通过 Compiler 实例(Dubbo 默认使用 javassist 作为编译器)编译源码,得到代理类 Class 实例。接下来,我们把重点放在代理类代码生成的逻辑上,其他逻辑大家自行分析。
自适应拓展代码生成
自适应拓展代码是在如下一行代码生成的。
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
来看一下 generate 的主要逻辑。过 Adaptive 注解检测,即可开始生成代码。代码生成的顺序与 Java 文件内容顺序一致,首先会生成 package 语句,然后生成 import 语句,紧接着生成类名,生成方法等代码。如下:
public String generate() {
// no need to generate adaptive class since there's no adaptive method found.
// 检测接口的所有方法是否至少有一个接口标注了 @Adaptive注解
if (!hasAdaptiveMethod()) {
throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
}
StringBuilder code = new StringBuilder();
// 生成 package 代码:package + type 所在包,例如 package org.apache.dubbo.rpc;
code.append(generatePackageInfo());
// 生成 import 代码:import + ExtensionLoader 全限定名,例如 import org.apache.dubbo.common.extension.ExtensionLoader;
code.append(generateImports());
// 生成类代码:public class + type简单名称 + $Adaptive + implements + type全限定名 + {,
// 例如: public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
code.append(generateClassDeclaration());
// 生成方法
Method[] methods = type.getMethods();
for (Method method : methods) {
code.append(generateMethod(method));
}
code.append("}");
if (logger.isDebugEnabled()) {
logger.debug(code.toString());
}
// logger.info("generate code = {}", code.toString());
System.out.println("generate code = " + JSONObject.toJSONString(code));
return code.toString();
}
来看一下生成方法 (generate Method) 的主要逻辑。首先会生成方法返回值,然后生成方法名称,接着生成方法内容,方法参数,异常信息,最后进行拼接。如下:
/**
* generate method declaration
*/
private String generateMethod(Method method) {
//方法的返回值: 例如 org.apache.dubbo.rpc.Exporter
String methodReturnType = method.getReturnType().getCanonicalName();
//方法名称, 例如export
String methodName = method.getName();
//方法内容
String methodContent = generateMethodContent(method);
//方法参数 org.apache.dubbo.rpc.Invoker arg0
String methodArgs = generateMethodArguments(method);
//异常信息 throws org.apache.dubbo.rpc.RpcException
String methodThrows = generateMethodThrows(method);
return String.format(CODE_METHOD_DECLARATION, methodReturnType, methodName, methodArgs, methodThrows, methodContent);
}
来看一下 生成方法内容 (generateMethodContent) 的主要逻辑。如下:
/**
* generate method content
*/
private String generateMethodContent(Method method) {
// 检测方法是否有 Adaptive注解
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
// dubbo 不会为没有标注 Adaptive 注解的方法生成代理逻辑,对于该种类型的方法,仅会生成一句抛出异常的代码。生成逻辑如下:
return generateUnsupported(method);
} else {
// 因为在自适应拓展中,必须从URL中提取目标拓展的名称。因此代码生成逻辑的一个重要的任务是从方法的参数列表或者其他参数中获取 URL 数据。
int urlTypeIndex = getUrlTypeIndex(method);
// found parameter in URL type
if (urlTypeIndex != -1) {
// Null Point check
// 如果有URL参数,为 URL 类型参数生成判空代码,例如: 在Protocol 接口的 refer方法:
// if (arg1 == null) throw new IllegalArgumentException("url == null");
// org.apache.dubbo.common.URL url = arg1;
code.append(generateUrlNullCheck(urlTypeIndex));
} else {
// did not find parameter in URL type
// 如果不包含URL参数,那么需要从其它参数中获取得到URL参数,例如在Protocol 接口的 expore方法,
// 就是从 Invoker 这个参数中获取 URL参数的
// 生成的代码如下:
// if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
//if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
//org.apache.dubbo.common.URL url = arg0.getUrl();
code.append(generateUrlAssignmentIndirectly(method));
}
// 获取 拓展名称
String[] value = getMethodAdaptiveValue(adaptiveAnnotation);
// 此段逻辑是检测方法列表中是否存在 Invocation 类型的参数,若存在,则为其生成判空代码和其他一些代码
boolean hasInvocation = hasInvocationArgument(method);
code.append(generateInvocationArgumentNullCheck(method));
// 例如: String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
code.append(generateExtNameAssignment(value, hasInvocation));
// 例如: if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name
// from url (" + url.toString() + ") use keys([protocol])");
code.append(generateExtNameNullCheck(value));
// 例如: org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader
// (org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
code.append(generateExtensionAssignment());
// 例如:return extension.export(arg0);
code.append(generateReturnAndInvocation(method));
}
return code.toString();
}
来看一下最后构建自适应拓展代码的示例。此例子是生成 Protocol 接口的代码。
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
采用装饰器模式进行功能增强
采用装饰器模式进行功能增强,自动包装实现,这种实现的类一般是自动激活的,常用于包装类,比如:Protocol的两个实现类:ProtocolFilterWrapper、ProtocolListenerWrapper。
例如:接口A的另一个实现者AWrapper1。大体内容如下:
private A a;
AWrapper1(A a){
this.a=a;
}
因此,当在获取某一个接口A的实现者A1的时候,已经自动被AWrapper1包装了。
接下来来看一下源码中是如何实现的。
dubbo中存在 一种对于扩展的封装类,其功能是将各扩展实例串联起来,形成扩展链,比如过滤器链,监听链。当调用ExtensionLoader的getExtension方法时,会做拦截处理,如果存在封装器,则返回封装器实现,而将真实实现通过构造方法注入到封装器中。
这里有个injectExtension方法,其作用是:
如果当前扩展实例存在其他的扩展属性,则通过反射调用其set方法设置扩展属性。若该扩展属性是适配器类型,也是通过ExtensionLoader获取的。
3. 总结
到此,关于自适应拓展的原理,实现就分析完了。总的来说自适应拓展整个逻辑还是很复杂的,并不是很容易弄懂。因此,大家在阅读该部分源码时,耐心一些。同时多进行调试,也可以通过生成好的代码思考代码的生成逻辑。
总结起来说,为啥 Dubbo 自适应SPI可以做到根据运行时参数进行加载呢?因为 在调用getAdaptiveExtension() 这个方法的时候,会根据 JavassistProxy 生成一个代理类。在调用接口的方法,会被代理到 代理类。代理类会根据 URL 中制定的参数名称的值来动态加载某个拓展实例,从而实现了动态加载的目标。
4. 鸣谢
核心源码-SPI扩展