nacos注册源码分析

注册中心(二):nacos注册源码分析

cosumer启动的时候,从nacos server上读取指定服务名称的实例列表,缓存到本地内存中。

开启一个定时任务,每隔10s去nacos server上拉取服务列表

nacos的push机制:

通过心跳检测发现服务提供者出现心态超时的时候,推送一个push消息到consumer,更新本地的缓存数据。

Nacos注册源码分析

<parent>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-parentartifactId>
    <version>2.2.8.RELEASEversion>
    <relativePath/>
parent>
<dependencies>
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-alibaba-dependenciesartifactId>
        <version>2.2.8.RELEASEversion>
        <type>pomtype>
        <scope>importscope>
    dependency>
dependencies>

基于以上版本源码去学习。

客户端Client

我们自己的项目在配置了nacos作为注册中心后,至少要配置这么一个属性

spring.cloud.nacos.discovery.server-addr=10.130.16.110:8848
从逻辑上看,这个是通过grpc去注册还是通过http去注册。false-http1.x注册  true-gRPC注册,默认是true,也就是通过gRPC去注册,毕竟gRPC的性能上要比http1.x高很多
spring.cloud.nacos.discovery.ephemeral=false

那么这个属性会让应用找到nacos的server地址去注册。如果不配置的话,会一直报错

springboot的 @EnableAutoConfiguration这里就不再讲解了。都到nacos的源码了,springboot默认是熟悉的。

我们再去打开 NacosServiceRegistryAutoConfiguration这个类。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
        matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
        AutoServiceRegistrationAutoConfiguration.class,
        NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosServiceManager nacosServiceManager,
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(),
                nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry,
                autoServiceRegistrationProperties, registration);
    }

}

其中第三个类 NacosAutoServiceRegistration实现了一个抽象类 AbstractAutoServiceRegistration.

public abstract class AbstractAutoServiceRegistration<R extends Registration>
        implements AutoServiceRegistration, ApplicationContextAware,
        ApplicationListener<WebServerInitializedEvent> {

    @Override
    @SuppressWarnings("deprecation")
    public void onApplicationEvent(WebServerInitializedEvent event) {
        bind(event);
    }

    @Deprecated
    public void bind(WebServerInitializedEvent event) {
        ApplicationContext context = event.getApplicationContext();
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                    .getServerNamespace())) {
                return;
            }
        }
        this.port.compareAndSet(0, event.getWebServer().getPort());
        this.start();
    }
    public void start() {
        if (!isEnabled()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Discovery Lifecycle disabled. Not starting");
            }
            return;
        }

        if (!this.running.get()) {
            this.context.publishEvent(
                    new InstancePreRegisteredEvent(this, getRegistration()));
            register();
            if (shouldRegisterManagement()) {
                registerManagement();
            }
            this.context.publishEvent(
                    new InstanceRegisteredEvent<>(this, getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }
}

看到这里有实现一个 ApplicationListener<webserverinitializedevent></webserverinitializedevent>的类,这个类是spring的一个监听事件(观察者模式),而这个事件就是webserver初始化的时候去触发的。 onApplicationEvent方法调用了 bind()方法。而 bind()中又调用了 start().

start()中有一个 register()。而这个register就是 NacosServiceRegistry中的 register()

register
public class NacosServiceRegistry implements ServiceRegistry<Registration> {

    @Override
    public void register(Registration registration) {

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }

        NamingService namingService = namingService();
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();

        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            if (nacosDiscoveryProperties.isFailFast()) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                        registration.toString(), e);
                rethrowRuntimeException(e);
            }
            else {
                log.warn("Failfast is false. {} register failed...{},", serviceId,
                        registration.toString(), e);
            }
        }
    }
}

  • getNacosInstanceFromRegistration 获取注册的实例信息。
private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();
    instance.setIp(registration.getHost());
    instance.setPort(registration.getPort());
    instance.setWeight(nacosDiscoveryProperties.getWeight());
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
    instance.setMetadata(registration.getMetadata());
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}
  • *namingService.registerInstance(serviceId, group, instance);

这个clientProxy有3个实现类,NamingClientProxyDelegate、NamingGrpcClientProxy、NamingHttpClientProxy。

这个类的构造方法中有个init(properties)方法,这个方法中给clientProxy赋值了。走的是NamingClientProxyDelegate方法。一般情况下,带有delegate的方法都是委派模式。

public NacosNamingService(String serverList) throws NacosException {
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
    init(properties);
}

public NacosNamingService(Properties properties) throws NacosException {
    init(properties);
}

private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);

    this.changeNotifier = new InstancesChangeNotifier();
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    clientProxy.registerService(serviceName, groupName, instance);
}
基于http1.x协议注册
  • NamingClientProxyDelegate.registerService 委派这里做了一个可执行的判断
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
  • NamingClientProxyDelegate.getExecuteClientProxy 做了一个判断,配置ephemeral=false就走http,否则grpc。这里请注意,如果nacos-server还是用的1.x.x版本的话,会报错的。因为2.x.x增加一个grpc的支持,会额外的多增加一个端口,默认对外提供端口为8848和9848
private NamingClientProxy getExecuteClientProxy(Instance instance) {
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
  • NamingHttpClientProxy.registerService 这里的clientProxy=NamingHttpClientProxy
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                       instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
        beatReactor.addBeatInfo(groupedServiceName, beatInfo);
    }
    final Map<String, String> params = new HashMap<String, String>(32);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, groupedServiceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put(IP_PARAM, instance.getIp());
    params.put(PORT_PARAM, String.valueOf(instance.getPort()));
    params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
    params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
    params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
    params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
    params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));

    reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

}
  • NamingHttpClientProxy.reqApi
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
    return reqApi(api, params, Collections.EMPTY_MAP, method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
    throws NacosException {
    return reqApi(api, params, body, serverListManager.getServerList(), method);
}

public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
                     String method) throws NacosException {

    params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

    if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {
        throw new NacosException(NacosException.INVALID_PARAM, "no server available");
    }

    NacosException exception = new NacosException();

    if (serverListManager.isDomain()) {
        String nacosDomain = serverListManager.getNacosDomain();
        for (int i = 0; i < maxRetry; i++) {
            try {
                return callServer(api, params, body, nacosDomain, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                }
            }
        }
    } else {
        Random random = new Random(System.currentTimeMillis());
        int index = random.nextInt(servers.size());

        for (int i = 0; i < servers.size(); i++) {
            String server = servers.get(index);
            try {
                return callServer(api, params, body, server, method);
            } catch (NacosException e) {
                exception = e;
                if (NAMING_LOGGER.isDebugEnabled()) {
                    NAMING_LOGGER.debug("request {} failed.", server, e);
                }
            }
            index = (index + 1) % servers.size();
        }
    }

    NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
                        exception.getErrMsg());

    throw new NacosException(exception.getErrCode(),
                             "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

}

serverListManager.isDomain()这个判断是配置了几个nacos server的值,如果是一个的话,走if逻辑,如果多余1个的话,走else逻辑。 else中的servers就是nacos server服务列表,通过Ramdom拿到一个随机数,然后去callServer(),如果发现其中的一个失败,那么index+1 获取下一个服务节点再去callServer。如果所有的都失败的话,则抛出错误。
* NamingHttpClientProxy.callServer 前边的判断支线省略,拼接url,拼好了后,进入try逻辑块中,这里封装了一个nacosRestTemplate类。请求完成后,返回一个restResult,拿到了请求结果后,把请求结果code放入了一个交MetricsMonitor的类中了,从代码上看很明显是监控相关的类,点击进去果然发现是prometheus相关的。这里我们不扩展了,继续回到主线。 如果返回结果是200的话,把result.content返回去。

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
    long start = System.currentTimeMillis();
    long end = 0;
    String namespace = params.get(CommonParams.NAMESPACE_ID);
    String group = params.get(CommonParams.GROUP_NAME);
    String serviceName = params.get(CommonParams.SERVICE_NAME);
    params.putAll(getSecurityHeaders(namespace, group, serviceName));
    Header header = NamingHttpUtil.builderHeader();

    String url;
    if (curServer.startsWith(HTTPS_PREFIX) || curServer.startsWith(HTTP_PREFIX)) {
        url = curServer + api;
    } else {
        if (!InternetAddressUtil.containsPort(curServer)) {
            curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;
        }
        url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
    }

    try {
        HttpRestResult<String> restResult = nacosRestTemplate
            .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
        end = System.currentTimeMillis();

        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
            .observe(end - start);

        if (restResult.ok()) {
            return restResult.getData();
        }
        if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
            return StringUtils.EMPTY;
        }
        throw new NacosException(restResult.getCode(), restResult.getMessage());
    } catch (Exception e) {
        NAMING_LOGGER.error("[NA] failed to request", e);
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }
}
  • NacosRestTemplate.exchangeForm 关键方法:this.requestClient().execute()
public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
                                          String httpMethod, Type responseType) throws Exception {
    RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
        header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
    return execute(url, httpMethod, requestHttpEntity, responseType);
}

private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
                                      Type responseType) throws Exception {
    URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
    if (logger.isDebugEnabled()) {
        logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
    }

    ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
    HttpClientResponse response = null;
    try {
        response = this.requestClient().execute(uri, httpMethod, requestEntity);
        return responseHandler.handle(response);
    } finally {
        if (response != null) {
            response.close();
        }
    }
}
private final HttpClientRequest requestClient;

private final List<HttpClientRequestInterceptor> interceptors = new ArrayList<HttpClientRequestInterceptor>();

public NacosRestTemplate(Logger logger, HttpClientRequest requestClient) {
    super(logger);
    this.requestClient = requestClient;
}
private HttpClientRequest requestClient() {
    if (CollectionUtils.isNotEmpty(interceptors)) {
        if (logger.isDebugEnabled()) {
            logger.debug("Execute via interceptors :{}", interceptors);
        }
        return new InterceptingHttpClientRequest(requestClient, interceptors.iterator());
    }
    return requestClient;
}

这里的requestClient是构造方法传入进来的。那么我们需要找到初始化的类

public class NamingHttpClientProxy extends AbstractNamingClientProxy {

    private final NacosRestTemplate nacosRestTemplate = NamingHttpClientManager.getInstance().getNacosRestTemplate();

}
public NacosRestTemplate getNacosRestTemplate() {
    return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}

HttpClientBeanHolder.getNacosRestTemplate

典型的双重检查锁。

public static NacosRestTemplate getNacosRestTemplate(HttpClientFactory httpClientFactory) {
    if (httpClientFactory == null) {
        throw new NullPointerException("httpClientFactory is null");
    }
    String factoryName = httpClientFactory.getClass().getName();
    NacosRestTemplate nacosRestTemplate = SINGLETON_REST.get(factoryName);
    if (nacosRestTemplate == null) {
        synchronized (SINGLETON_REST) {
            nacosRestTemplate = SINGLETON_REST.get(factoryName);
            if (nacosRestTemplate != null) {
                return nacosRestTemplate;
            }
            nacosRestTemplate = httpClientFactory.createNacosRestTemplate();
            SINGLETON_REST.put(factoryName, nacosRestTemplate);
        }
    }
    return nacosRestTemplate;
}

而这里的httpClientFactory是什么呢?

而NamingHttpClientFactory是一个AbstractHttpClientFactory的实现类,由于NamingHttpClientProxy没有重写createNacosRestTemplate方法。所以最终引用的也就是AbstractHttpClientFactory的createNacosRestTemplate方法。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mKfKOjzC-1676020921987)(E:\books\img\HttpClientFactory.png)]

private static final HttpClientFactory HTTP_CLIENT_FACTORY = new NamingHttpClientFactory();
public NacosRestTemplate getNacosRestTemplate() {
    return HttpClientBeanHolder.getNacosRestTemplate(HTTP_CLIENT_FACTORY);
}

private static class NamingHttpClientFactory extends AbstractHttpClientFactory {

    @Override
    protected HttpClientConfig buildHttpClientConfig() {
        return HttpClientConfig.builder().setConTimeOutMillis(CON_TIME_OUT_MILLIS)
            .setReadTimeOutMillis(READ_TIME_OUT_MILLIS).setMaxRedirects(MAX_REDIRECTS).build();
    }

    @Override
    protected Logger assignLogger() {
        return NAMING_LOGGER;
    }
}

AbstractHttpClientFactory.createNacosRestTemplate

@Override
public NacosRestTemplate createNacosRestTemplate() {
    HttpClientConfig httpClientConfig = buildHttpClientConfig();
    final JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);

    initTls(new BiConsumer<SSLContext, HostnameVerifier>() {
        @Override
        public void accept(SSLContext sslContext, HostnameVerifier hostnameVerifier) {
            clientRequest.setSSLContext(loadSSLContext());
            clientRequest.replaceSSLHostnameVerifier(hostnameVerifier);
        }
    }, new TlsFileWatcher.FileChangeListener() {
        @Override
        public void onChanged(String filePath) {
            clientRequest.setSSLContext(loadSSLContext());
        }
    });

    return new NacosRestTemplate(assignLogger(), clientRequest);
}

JdkHttpClientRequest clientRequest = new JdkHttpClientRequest(httpClientConfig);

可以看到这里定义了一个JdkHttpClientRequest 。

再往下跟就到java.net.HttpURLConnection的调用,去请求nacos-server的地址,再往下的就不做分析了,进入了http的通讯层了。

最终返回了一个结果,如果是200的话,就注册成功了。失败了就会抛出异常。

基于gRPC http2.0的注册

这里同样的从gRPC和http的委派来进行分析

  • NamingClientProxyDelegate.registerService 这里的代码上边已经分析过,我们直接进入gRPC的实现。
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
  • NamingGrpcClientProxy.registerService redoService.cacheInstanceForRedo 这个从名称上看应该是重试机制,
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
                       instance);
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);
    doRegisterService(serviceName, groupName, instance);
}
  • NamingGrpcRedoService.cacheInstanceForRedo 这里看起来只是给ConcurrentMap中存放一个redoData,并没有其他的逻辑,后续可能会用到这个。回到主线,继续往下走。
private final ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        registeredInstances.put(key, redoData);
    }
}
  • NamingGrpcClientProxy.doRegisterService request是根据构造函数封装的一个实例,requestToServer去进行注册。
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                                                  NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);
    redoService.instanceRegistered(serviceName, groupName);
}
  • NamingGrpcClientProxy.requestToServer request.putAllHeader推测是跟权限校验相关的,我搭建的没有设置鉴权,所以都是空的。 然后根据rpcClient去调用request方法。根据超时时间判断的,这2个分支最终都会进入一个方法,默认是3s的超时时间。 最终返回一个response结果。
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
    throws NacosException {
    try {
        request.putAllHeader(
            getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        Response response =
            requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                            response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
  • RpcClient.request

这里的校验暂且不看,直切主线, response = this.currentConnection.request(request, timeoutMills);

再进入到request方法。

public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                                         "Client not connected, current status:" + rpcClientStatus.get());
            }
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                                            "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                                            currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            switchServerAsync();
                        }
                    }

                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }

            lastActiveTimeStamp = System.currentTimeMillis();
            return response;

        } catch (Exception e) {
            if (waitReconnect) {
                try {

                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {

                }
            }

            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                                            request, retryTimes, e.getMessage());

            exceptionThrow = e;

        }
        retryTimes++;

    }

    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }

    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
            : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}
  • *GrpcConnection.request

这里的就是封装的rpc请求,和服务端进行交互的逻辑。在这里封装了一个PayLoad类

@Override
public Response request(Request request, long timeouts) throws NacosException {
    Payload grpcRequest = GrpcUtils.convert(request);
    ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
    Payload grpcResponse;
    try {
        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, e);
    }

    return (Response) GrpcUtils.parse(grpcResponse);
}

以上代码是client端源码

Server端

接收注册

客户端和服务端之间进行交互的话,一定需要建立一个网络连接。这里的grpc的源码相对来说比较复杂,就简单分析nacos相关的。

工程名称是nacos-console。

BaseGrpcServer在启动的时候会绑定很多的Handler。

nacos注册源码分析

而基于grpc的通信,会进入server端的InstanceRequestHandler

  • InstanceRequestHandler.handle 从handle方法中可以根据type走到registerInstance中。 最终进入到EphemeralClientOperationServiceImpl.registerInstance
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Override
    @Secured(action = ActionTypes.WRITE)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:

                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:

                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }

    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
            throws NacosException {

        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                request.getInstance().getIp(), request.getInstance().getPort()));
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
                service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
}
  • EphemeralClientOperationServiceImpl.registerInstance 这里的clientManager.getClient(client)说明跳转到下边的建立长连接
@Override
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);

    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                                        String.format("Current service %s is persistent service, can't register ephemeral instance.",
                                                      singleton.getGroupedServiceName()));
    }

    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }
    InstancePublishInfo instanceInfo = getPublishInfo(instance);

    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter
        .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
  • AbstractClient.addServiceInstance

protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    if (null == publishers.put(service, instancePublishInfo)) {
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
        } else {
            MetricsMonitor.incrementInstanceCount();
        }
    }

    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}
  • ClientServiceIndexesManager

private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();

private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
@Override
public void onEvent(Event event) {
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
    } else if (event instanceof ClientOperationEvent) {
        handleClientOperation((ClientOperationEvent) event);
    }
}

private void handleClientOperation(ClientOperationEvent event) {
    Service service = event.getService();
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {

        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {

        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {

        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {

        removeSubscriberIndexes(service, clientId);
    }
}
建立长连接(这里的过程比较难一些,还在持续学习中)

GrpcBiStreamRequestAcceptor这个类是建立连接的。

每一个grpc请求过来后,都会进入到GrpcBiStreamRequestAcceptor.requestBiStream的方法中。

而会话的长连接id就是这里的ConnectionId。

@Service
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {
     @Autowired
    ConnectionManager connectionManager;

    private void traceDetailIfNecessary(Payload grpcRequest) {
        String clientIp = grpcRequest.getMetadata().getClientIp();
        String connectionId = CONTEXT_KEY_CONN_ID.get();
        try {
            if (connectionManager.traced(clientIp)) {
                Loggers.REMOTE_DIGEST.info("[{}]Bi stream request receive, meta={},body={}", connectionId,
                        grpcRequest.getMetadata().toByteString().toStringUtf8(),
                        grpcRequest.getBody().toByteString().toStringUtf8());
            }
        } catch (Throwable throwable) {
            Loggers.REMOTE_DIGEST.error("[{}]Bi stream request error,payload={},error={}", connectionId,
                    grpcRequest.toByteString().toStringUtf8(), throwable);
        }

    }
    @Override
    public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {

        StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {

            final String connectionId = CONTEXT_KEY_CONN_ID.get();

            final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get();

            final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get();

            String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get();

            String clientIp = "";

            @Override
            public void onNext(Payload payload) {

                clientIp = payload.getMetadata().getClientIp();
                traceDetailIfNecessary(payload);

                Object parseObj;
                try {
                    parseObj = GrpcUtils.parse(payload);
                } catch (Throwable throwable) {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable);
                    return;
                }

                if (parseObj == null) {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId,
                                    payload.getBody().getValue().toStringUtf8(), payload.getMetadata());
                    return;
                }
                if (parseObj instanceof ConnectionSetupRequest) {
                    ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj;
                    Map<String, String> labels = setUpRequest.getLabels();
                    String appName = "-";
                    if (labels != null && labels.containsKey(Constants.APPNAME)) {
                        appName = labels.get(Constants.APPNAME);
                    }

                    ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),
                            remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),
                            setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());
                    metaInfo.setTenant(setUpRequest.getTenant());
                    Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());
                    connection.setAbilities(setUpRequest.getAbilities());
                    boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();

                    if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {

                        try {
                            Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId,
                                    rejectSdkOnStarting ? " server is not started" : " server is over limited.");
                            connection.request(new ConnectResetRequest(), 3000L);
                            connection.close();
                        } catch (Exception e) {

                            if (connectionManager.traced(clientIp)) {
                                Loggers.REMOTE_DIGEST
                                        .warn("[{}]Send connect reset request error,error={}", connectionId, e);
                            }
                        }
                    }

                } else if (parseObj instanceof Response) {
                    Response response = (Response) parseObj;
                    if (connectionManager.traced(clientIp)) {
                        Loggers.REMOTE_DIGEST
                                .warn("[{}]Receive response of server request  ,response={}", connectionId, response);
                    }
                    RpcAckCallbackSynchronizer.ackNotify(connectionId, response);
                    connectionManager.refreshActiveTime(connectionId);
                } else {
                    Loggers.REMOTE_DIGEST
                            .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId,
                                    parseObj);
                }

            }

            @Override
            public void onError(Throwable t) {
                if (connectionManager.traced(clientIp)) {
                    Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on error,error={}", connectionId, t);
                }

                if (responseObserver instanceof ServerCallStreamObserver) {
                    ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                    if (serverCallStreamObserver.isCancelled()) {

                    } else {
                        try {
                            serverCallStreamObserver.onCompleted();
                        } catch (Throwable throwable) {

                        }
                    }
                }

            }

            @Override
            public void onCompleted() {
                if (connectionManager.traced(clientIp)) {
                    Loggers.REMOTE_DIGEST.warn("[{}]Bi stream on completed", connectionId);
                }
                if (responseObserver instanceof ServerCallStreamObserver) {
                    ServerCallStreamObserver serverCallStreamObserver = ((ServerCallStreamObserver) responseObserver);
                    if (serverCallStreamObserver.isCancelled()) {

                    } else {
                        try {
                            serverCallStreamObserver.onCompleted();
                        } catch (Throwable throwable) {

                        }

                    }
                }
            }
        };

        return streamObserver;
    }
}
  • ConnectionManager.register 这里的connections是用来管理所有的长连接的
Map<String, Connection> connections = new ConcurrentHashMap<>();

public synchronized boolean register(String connectionId, Connection connection) {

    if (connection.isConnected()) {
        String clientIp = connection.getMetaInfo().clientIp;
        if (connections.containsKey(connectionId)) {
            return true;
        }
        if (checkLimit(connection)) {
            return false;
        }
        if (traced(clientIp)) {
            connection.setTraced(true);
        }
        connections.put(connectionId, connection);
        if (!connectionForClientIp.containsKey(clientIp)) {
            connectionForClientIp.put(clientIp, new AtomicInteger(0));
        }
        connectionForClientIp.get(clientIp).getAndIncrement();

        clientConnectionEventListenerRegistry.notifyClientConnected(connection);

        LOGGER.info("new connection registered successfully, connectionId = {},connection={} ", connectionId,
                    connection);
        return true;

    }
    return false;
}

Original: https://blog.csdn.net/u012663412/article/details/128717357
Author: Monkey KevinWu
Title: nacos注册源码分析

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/814216/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球