dubbo怎么暴露接口的?dubbo服务暴露原理

阳光 2022-06-24 15:27:11 java常见问答 7881

Dubbo 3.0 为了能够更好的适配云原生,将原来的接口级服务发现机制演进为应用级服务发现机制。这样能大幅降低框架带来的额外资源消耗,大幅提升资源利用率,那dubbo怎么暴露接口的?下面来我们就来给大家讲解一下。

在dubbo中默认采用javassistProxyFactory来获取由javassist代理的invoker。

public < T > Invoker < T > getInvoker(T proxy, Class < T > type, URL url)
{
    // TODO Wrapper类不能正确处理带$的类名
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass()
        .getName()
        .indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker < T > (proxy, type, url)
    {
        @Override
        protected Object doInvoke(T proxy, String methodName
            , Class < ? > [] parameterTypes
            , Object[] arguments) throws Throwable
        {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

首先会根据所要代理的类名产生相应的wrapper来对所要代理的类进行包装。

在getWrapper中,会先根据所要包装的类是否已经被包装过。如果已经包装过,则会直接在来保存已经生成的包装类的map里去寻找,否则会直接生成新的包装类。

makeWrapper()根据传递的类获取新的包装类。

在makeWrapper()方法中,通过javassist根据所需要包装的类动态生成了包装类。由于Invoker的作用,大概在这里动态实现的最重要的方法应该是invokeMothed()方法。

if (hasMethod)
{
    c3.append(" try{");
}
for (Method m: methods)
{
    if (m.getDeclaringClass() == Object.class) //ignore Object's method.
        continue;
    String mn = m.getName();
    c3.append(" if( \"")
        .append(mn)
        .append("\".equals( $2 ) ");
    int len = m.getParameterTypes()
        .length;
    c3.append(" && ")
        .append(" $3.length == ")
        .append(len);
    boolean override = false;
    for (Method m2: methods)
    {
        if (m != m2 && m.getName()
            .equals(m2.getName()))
        {
            override = true;
            break;
        }
    }
    if (override)
    {
        if (len > 0)
        {
            for (int l = 0; l < len; l++)
            {
                c3.append(" && ")
                    .append(" $3[")
                    .append(l)
                    .append("].getName().equals(\"")
                    .append(m.getParameterTypes()[l].getName())
                    .append("\")");
            }
        }
    }
    c3.append(" ) { ");
    if (m.getReturnType() == Void.TYPE)
        c3.append(" w.")
        .append(mn)
        .append('(')
        .append(args(m.getParameterTypes(), "$4"))
        .append(");")
        .append(" return null;");
    else
        c3.append(" return ($w)w.")
        .append(mn)
        .append('(')
        .append(args(m.getParameterTypes(), "$4"))
        .append(");");
    c3.append(" }");
    mns.add(mn);
    if (m.getDeclaringClass() == c)
        dmns.add(mn);
    ms.put(ReflectUtils.getDesc(m), m);
}
if (hasMethod)
{
    c3.append(" } catch(Throwable e) { ");
    c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
    c3.append(" }");
}
c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

在这段动态生成了invokeMethod的方法体。将会根据方法名调用包装类中真正所需要被调用的类的方法。

大致的invokeMethod()方法如下。

public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException
{
    dubbo.provider.hello.service.impl.HelloWorld w;
    try
    {
        w = ((dubbo.provider.hello.service.impl.HelloServiceImpl) $1);
    }
    catch (Throwable e)
    {
        throw new IllegalArgumentException(e);
    }
    try
    {
        if ("helloWorld".equals($2) && $3.length == 0)
        {
            w.helloWorld();
            return null;
        }
        if ("getName".equals($2) && $3.length == 0)
        {
            return ($w) w.getNanme();
        }
    }
    catch (Throwable e)
    {
        throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class dubbo.provider.hello.service.impl.HelloServiceImpl.");
}

可以见到,当外部代用被包装的类的相应的方法,都会在这里被找到方法并调用。

在得到了包装类之后,在javassistProxyFactory的getInvoker()方法最后会生成新的AbstractProxyInvoker(),重写了doInvoke()方法,会直接调用刚刚生成的包装类的invokeMethod()方法。

在得到invoker之后,会根据获得的invoker开始服务的暴露,通过protocol的export()方法。配置有filter或者listener的情况下,会在这里产生关于具体服务暴露操作的过滤与监听。值得一提的是,如果采用的是registry协议,那么并不会经过ProtocolListenerWrapper的监听,而是直接进入export()方法开始服务的暴露。

public < T > Exporter < T >
    export (final Invoker < T > originInvoker) throws RpcException
    {
        //export invoker
        final ExporterChangeableWrapper < T > exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter < T > ()
        {
            public Invoker < T > getInvoker()
            {
                return exporter.getInvoker();
            }
            public void unexport()
            {
                try
                {
                    exporter.unexport();
                }
                catch (Throwable t)
                {
                    logger.warn(t.getMessage(), t);
                }
                try
                {
                    registry.unregister(registedProviderUrl);
                }
                catch (Throwable t)
                {
                    logger.warn(t.getMessage(), t);
                }
                try
                {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                }
                catch (Throwable t)
                {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

以上是registryProtocol的暴露服务export()方法的流程。

首先会通过doLocalExport()方法开始本地服务的暴露。

private < T > ExporterChangeableWrapper < T > doLocalExport(final Invoker < T > originInvoker)
{
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper < T > exporter = (ExporterChangeableWrapper < T > ) bounds.get(key);
    if (exporter == null)
    {
        synchronized(bounds)
        {
            exporter = (ExporterChangeableWrapper < T > ) bounds.get(key);
            if (exporter == null)
            {
                final Invoker < ? > invokerDelegete = new InvokerDelegete < T > (originInvoker, getProviderUrl(originInvoker));
                exporter = new ExporterChangeableWrapper < T > ((Exporter < T > ) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return (ExporterChangeableWrapper < T > ) exporter;
}

在本地暴露的过程中,首先会根据传入的invoker获取其提供方的url来获得key,根据获得的key来尝试获得ExporterChangeableWrapper,如果没有,则会先通过原本采用的protocol协议进行服务暴露,默认dubbo。

在dubboProtocol的export()方法中,会根据url生成server key,根据invoker和key生成新的dubboExporter并根据key和exporter存放在map里。但是最关键的是,将在这里调用openServer()方法开启与注册中心的连接。

在openServer中,如果已经建立过与服务器的连接,那么会直接在这里返回,否则会通过netty新建与注册中心的网络连接。

private ExchangeServer createServer(URL url)
{
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class)
        .hasExtension(str))
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try
    {
        server = Exchangers.bind(url, requestHandler);
    }
    catch (RemotingException e)
    {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0)
    {
        Set < String > supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class)
            .getSupportedExtensions();
        if (!supportedTypes.contains(str))
        {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

在这里会通过生成ExchangeServer开始监听相应的channel并绑定对应的requestHandler进行相关的回调处理。

可见,在dubboProtocol的服务暴露过程中,完成了对网络监听的配置与开启。

在完成dubboProtocol的export()方法之后,回到RegistryProtocol的doLocalExport()方法,根据dubboProtocol暴露生成的dubboExporter()以及invoker生成新的ExportChangeableWrapper返回。

之后,通过getRegistry获得注册中心的实例。会根据配置的registryFactory生成对应的registry实例。

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
{
    super(url);
    if (url.isAnyHost())
    {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR))
    {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    zkClient = zookeeperTransporter.connect(url);
    zkClient.addStateListener(new StateListener()
    {
        public void stateChanged(int state)
        {
            if (state == RECONNECTED)
            {
                try
                {
                    recover();
                }
                catch (Exception e)
                {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

以zookeeper为例,在其构造方法当中会根据url新生成一个zkClient得到与zookeeper注册中心的连接。

在得到了注册中心的实例之后,通过register()方法正式注册服务到注册中心当中去。

在调用注册方法的过程中,被注册的url将会在zookeeperRegistry的父类的父类abstractRegistry中保存被注册的url。然后在zookeeperRegistry的父类当中调用zookeeperRegistry的doRegistry()方法。也就说在zookeeperRegistry的doRegistry方法当中,会将要暴露的服务信息提交给注册中心。

之后,会向注册中心订阅这一服务,以保证注册数据变动时的自动推送。这就是dubbo服务暴露原理,最后大家如果想要了解更多java架构师知识,敬请关注奇Q工具网。

推荐阅读:

redis是什么意思?redis有哪些数据类型?

java三元运算符的值如何确定?java三元运算符怎么使用?

电脑json怎么打开?json如何写注释?