Dubbo 3.0 为了能够更好的适配云原生,将原来的接口级服务发现机制演进为应用级服务发现机制。这样能大幅降低框架带来的额外资源消耗,大幅提升资源利用率,那dubbo怎么暴露接口的?下面来我们就来给大家讲解一下。
在dubbo中默认采用javassistProxyFactory来获取由javassist代理的invoker。
public < T > Invoker < T > getInvoker(T proxy, Class < T > type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass() .getName() .indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker < T > (proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName , Class < ? > [] parameterTypes , Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
首先会根据所要代理的类名产生相应的wrapper来对所要代理的类进行包装。
在getWrapper中,会先根据所要包装的类是否已经被包装过。如果已经包装过,则会直接在来保存已经生成的包装类的map里去寻找,否则会直接生成新的包装类。
makeWrapper()根据传递的类获取新的包装类。
在makeWrapper()方法中,通过javassist根据所需要包装的类动态生成了包装类。由于Invoker的作用,大概在这里动态实现的最重要的方法应该是invokeMothed()方法。
if (hasMethod) { c3.append(" try{"); } for (Method m: methods) { if (m.getDeclaringClass() == Object.class) //ignore Object's method. continue; String mn = m.getName(); c3.append(" if( \"") .append(mn) .append("\".equals( $2 ) "); int len = m.getParameterTypes() .length; c3.append(" && ") .append(" $3.length == ") .append(len); boolean override = false; for (Method m2: methods) { if (m != m2 && m.getName() .equals(m2.getName())) { override = true; break; } } if (override) { if (len > 0) { for (int l = 0; l < len; l++) { c3.append(" && ") .append(" $3[") .append(l) .append("].getName().equals(\"") .append(m.getParameterTypes()[l].getName()) .append("\")"); } } } c3.append(" ) { "); if (m.getReturnType() == Void.TYPE) c3.append(" w.") .append(mn) .append('(') .append(args(m.getParameterTypes(), "$4")) .append(");") .append(" return null;"); else c3.append(" return ($w)w.") .append(mn) .append('(') .append(args(m.getParameterTypes(), "$4")) .append(");"); c3.append(" }"); mns.add(mn); if (m.getDeclaringClass() == c) dmns.add(mn); ms.put(ReflectUtils.getDesc(m), m); } if (hasMethod) { c3.append(" } catch(Throwable e) { "); c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); c3.append(" }"); } c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");
在这段动态生成了invokeMethod的方法体。将会根据方法名调用包装类中真正所需要被调用的类的方法。
大致的invokeMethod()方法如下。
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { dubbo.provider.hello.service.impl.HelloWorld w; try { w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("helloWorld".equals($2) && $3.length == 0) { w.helloWorld(); return null; } if ("getName".equals($2) && $3.length == 0) { return ($w) w.getNanme(); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl."); }
可以见到,当外部代用被包装的类的相应的方法,都会在这里被找到方法并调用。
在得到了包装类之后,在javassistProxyFactory的getInvoker()方法最后会生成新的AbstractProxyInvoker(),重写了doInvoke()方法,会直接调用刚刚生成的包装类的invokeMethod()方法。
在得到invoker之后,会根据获得的invoker开始服务的暴露,通过protocol的export()方法。配置有filter或者listener的情况下,会在这里产生关于具体服务暴露操作的过滤与监听。值得一提的是,如果采用的是registry协议,那么并不会经过ProtocolListenerWrapper的监听,而是直接进入export()方法开始服务的暴露。
public < T > Exporter < T > export (final Invoker < T > originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper < T > exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter < T > () { public Invoker < T > getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
以上是registryProtocol的暴露服务export()方法的流程。
首先会通过doLocalExport()方法开始本地服务的暴露。
private < T > ExporterChangeableWrapper < T > doLocalExport(final Invoker < T > originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper < T > exporter = (ExporterChangeableWrapper < T > ) bounds.get(key); if (exporter == null) { synchronized(bounds) { exporter = (ExporterChangeableWrapper < T > ) bounds.get(key); if (exporter == null) { final Invoker < ? > invokerDelegete = new InvokerDelegete < T > (originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper < T > ((Exporter < T > ) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper < T > ) exporter; }
在本地暴露的过程中,首先会根据传入的invoker获取其提供方的url来获得key,根据获得的key来尝试获得ExporterChangeableWrapper,如果没有,则会先通过原本采用的protocol协议进行服务暴露,默认dubbo。
在dubboProtocol的export()方法中,会根据url生成server key,根据invoker和key生成新的dubboExporter并根据key和exporter存放在map里。但是最关键的是,将在这里调用openServer()方法开启与注册中心的连接。
在openServer中,如果已经建立过与服务器的连接,那么会直接在这里返回,否则会通过netty新建与注册中心的网络连接。
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class) .hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set < String > supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class) .getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
在这里会通过生成ExchangeServer开始监听相应的channel并绑定对应的requestHandler进行相关的回调处理。
可见,在dubboProtocol的服务暴露过程中,完成了对网络监听的配置与开启。
在完成dubboProtocol的export()方法之后,回到RegistryProtocol的doLocalExport()方法,根据dubboProtocol暴露生成的dubboExporter()以及invoker生成新的ExportChangeableWrapper返回。
之后,通过getRegistry获得注册中心的实例。会根据配置的registryFactory生成对应的registry实例。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
以zookeeper为例,在其构造方法当中会根据url新生成一个zkClient得到与zookeeper注册中心的连接。
在得到了注册中心的实例之后,通过register()方法正式注册服务到注册中心当中去。
在调用注册方法的过程中,被注册的url将会在zookeeperRegistry的父类的父类abstractRegistry中保存被注册的url。然后在zookeeperRegistry的父类当中调用zookeeperRegistry的doRegistry()方法。也就说在zookeeperRegistry的doRegistry方法当中,会将要暴露的服务信息提交给注册中心。
之后,会向注册中心订阅这一服务,以保证注册数据变动时的自动推送。这就是dubbo服务暴露原理,最后大家如果想要了解更多java架构师知识,敬请关注奇Q工具网。
推荐阅读: