Create your Gitee Account
Explore and code with more than 6 million developers,Free private repositories !:)
Sign up
Clone or download
nacos.md 71.29 KB
Copy Edit Web IDE Raw Blame History
Patrick authored 2021-01-31 19:01 . fix some content

img

Nacos Config 源码阅读与分析

动态配置管理是 Nacos 的三大功能之一,通过动态配置服务,我们可以在所有环境中以集中和动态的方式管理所有应用程序或服务的配置信息。这种服务能够让我们的服务拥有更多的灵活性,不需要重启服务即可做到配置实时生效,非常适合于“配置优先”的服务开发。动态配置中心可以实现配置更新时无需重新部署应用程序和服务即可使相应的配置信息生效,这极大了增加了系统的运维能力。

本文将重点对 Nacos 配置管理部分进行分析。

说明:本文基于 nacos-1.4.1版本 进行分析,请注意区分。

[TOC]

Nacos 简介

Nacos 是 Alibaba 公司推出的开源工具,用于实现分布式系统的服务发现与配置管理。

英文全称 Dynamic Naming and Configuration Service,Na 为 Naming / Name Server 即注册中心,co 为 Configuration 即配置中心,Service 是指该注册/配置中心都是以服务为核心。是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

Nacos 致力于帮助用户发现、配置和管理微服务。提供了一组简单易用的特性集,帮助用户快速实现动态服务发现、服务配置、服务元数据及流量管理。详情可以参考Nacos官网介绍

Nacos 的主要目标:

  • 构建简单易用的,服务相关的工具集,包括服务发现、配置管理、服务元数据存储、推送、一致性及元数据管理等;
  • 与包括 Spring CloudKubernetesDubbo 等开源生态做无缝的融合与支持,同时给这些生态带来很多面向生产时需要的优秀特性。

Nacos 官网:https://nacos.io/

Nacos 源码 GitHub 仓库:https://github.com/alibaba/nacos

Nacos 配置管理(Configuration Management,CM)简介

系统配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动。

配置 (Configuration)

在系统开发过程中,开发者通常会将一些需要变更的参数、变量等从代码中分离出来独立管理,以独立的配置文件的形式存在。目的是让静态的系统工件或者交付物(如 WAR,JAR 包等)更好地和实际的物理运行环境进行适配。配置管理一般包含在系统部署的过程中,由系统管理员或者运维人员完成。配置变更是调整系统运行时的行为的有效手段。

为什么要配置管理

微服务背景下,配置管理呈现两大特征:分散、动态。这两点都很易于理解:微服务下是不可能一个配置文件管理多个服务,同时同一个服务会分散在海量机器上。这会带来程序配置管理的碎片化,也就是“分散”。同时微服务需要更灵活地更及时地获取到配置,也就是“动态”。所以传统的静态配置文件\代码写死的管理方式无法满足现在的要求。

配置管理策略

  • 降低运维困难
  • 实时推送能力

Nacos 动态配置管理

动态修改配置并实时生效,让服务拥有更多的灵活性,不需要重启服务即可做到配置实时生效,非常适合于“配置优先”的服务开发。

动态配置服务可以让您以中心化、外部化和动态化的方式管理所有环境的应用配置和服务配置。

动态配置消除了配置变更时重新部署应用和服务的需要,让配置管理变得更加高效和敏捷。

配置中心化管理让实现无状态服务变得更简单,让服务按需弹性扩展变得更容易。

Nacos 提供了一个简洁易用的UI (控制台样例 Demo) 帮助您管理所有的服务和应用的配置。Nacos 还提供包括配置版本跟踪、金丝雀发布、一键回滚配置以及客户端配置更新状态跟踪在内的一系列开箱即用的配置管理特性,帮助您更安全地在生产环境中管理配置变更和降低配置变更带来的风险。

Nacos 中的一些基本概念

下面列出一些本文会用到的概念,其他概念及解释详见 Nacos概念

Namespace(命名空间)

用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。

Configuration Item(配置项)

一个具体的可配置的参数与其值域,通常以 param-key=param-value 的形式存在。例如我们常配置系统的日志输出级别(logLevel=INFO|WARN|ERROR) 就是一个配置项。

Configuration Set(配置集)

一组相关或者不相关的配置项的集合称为配置集。在系统中,一个配置文件通常就是一个配置集,包含了系统各个方面的配置。例如,一个配置集可能包含了数据源、线程池、日志级别等配置项。

Data ID(配置集 ID)

Nacos 中的某个配置集的 ID。配置集 ID 是组织划分配置的维度之一。Data ID 通常用于组织划分系统的配置集。一个系统或者应用可以包含多个配置集,每个配置集都可以被一个有意义的名称标识。Data ID 通常采用类 Java 包(如 com.taobao.tc.refund.log.level)的命名规则保证全局唯一性。此命名规则非强制。

Group(配置分组)

Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或 Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和 MQ_topic 配置。

Configuration Snapshot(配置快照)

Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。

配置领域模型(Config Entity Relationship Model)

围绕配置,主要有两个关联的实体,一个是配置变更历史,一个是服务标签(用于打标分类,方便索引),由 ID 关联。

![Config Entity Relationship Model](nacos.assets/Config Entity Relationship Model.jpeg)

Nacos 使用 Namespace + Group + Data Id 来确定一个配置的内容。(下图是 Nacos 的数据模型)

Nacos Data Model

Nacos 的接口

Nacos支持三类接口:

  • OPEN-API:纯 HTTP 接口
curl -x GET "http://serverIp:8848/nacos/v1/cs/configs?dataId=dataIdparam&group=groupParam&tenant=tenantParam"
  • SDK:目前官方社区已有JAVA版本
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException
  • Spring 注解
@NacosInjected
private ConfigService configService;

@Test
public void testPublishConfig() throws NacosException {
    configService.publishConfig(DATA_ID, DEFAULT_GROUP, "9527");
}

主要功能分析与建模

Nacos 配置中心在微服务架构中主要负责各个微应用自有或者公有的配置信息管理,包括配置信息的添加编辑、读取、变化后的通知等。Nacos 还提供了配置信息的版本控制、监听管理等增强功能。下面一起去看一下 Nacos 配置中心的主要功能吧。

主要功能点

配置CRUD(增加(Create)、检索(Retrieve)、更新(Update)和删除(Delete)),版本管理,灰度管理,监听管理,推送轨迹,聚合数据等。

为实现这些与配置相关的操作,构造了配置服务的一个接口 ConfigService 如下。

ConfigService

ConfigService是客户端对Nacos Server配置操作顶层抽象接口,有唯一实现类NacosConfigService(单例模式),可以增、删、盖、查配置信息,还可以通过该 API 给配置添加 Listener。

public interface ConfigService {
 
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
 
    // 获取配置同时添加Listener,用于监听变更事件
    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener) throws NacosException;
 
    // 添加Listener,用于监听变更事件
    void addListener(String dataId, String group, Listener listener) throws NacosException;
 
    // 发布配置
    boolean publishConfig(String dataId, String group, String content) throws NacosException;
 
    // 移除配置
    boolean removeConfig(String dataId, String group) throws NacosException;
 
    // 移除Listener
    void removeListener(String dataId, String group, Listener listener);
 
    // 获取Server状态 UP or DOWN
    String getServerStatus();
    
    void shutDown() throws NacosException;
}
ConfigService Class
配置CRUD

/nacos-config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigController.java

ConfigController
发布配置

publishConfig方法

    /**
     * Adds or updates non-aggregated data.
     *
     * @throws NacosException NacosException.
     */
    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
			...
            @RequestParam(value = "schema", required = false) String schema) throws NacosException {      
		...
            
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
          		...
            }
        } else {
            // beta publish
			...
        }       
    }

persistService.insertOrUpdate将提交的配置信息持久化,最终的持久化通过 /nacos-config/src/main/java/com/alibaba/nacos/config/server/service/PersistService.javaaddConfigInfo 方法实现。

    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        boolean result = tjt.execute(status -> {
            try {
                long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
                String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
                addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
                        configInfo.getTenant());
                insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
            } catch (CannotGetJdbcConnectionException e) {
                LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
                throw e;
            }
            return Boolean.TRUE;
        });
    }

addConfigInfo 方法里面使用了编程式事务(Transaction Template),首先调用 addConfigInfoAtomic 向数据库添加了一条配置信息,然后 addConfiTagsRelationAtomic 添加关系表数据(config和tag之间的关系表),insertConfigHistoryAtomic 添加配置历史的记录 。然后判断如果是需要通知,再向 EventDispatcher 添加一条配置变更的事件。

获取配置

doGetConfig方法

入口:/nacos-config/src/main/java/com/alibaba/nacos/config/server/controller/ConfigServletInner.java

ConfigServletInner

该方法是同步的,在调用前会先加锁(加锁是在 groupKey 内部,即在使用同一个 group 的配置,在读取时会加锁)。

        final String groupKey = GroupKey2.getKey(dataId, group, tenant);
        String autoTag = request.getHeader("Vipserver-Tag");
        String requestIpApp = RequestUtil.getAppName(request);
        int lockResult = tryConfigReadLock(groupKey);

进入同步区域后,读取数据库以获得相应的配置数据。

configInfoBase = persistService.findConfigInfo(dataId, group, tenant);

读取数据库返回配置数据的 findConfigInfo 方法如下:

    public ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) {
        final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;
        try {
            return this.jt.queryForObject(
                    "SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?",
                    new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER);
        } catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null.
            return null;
        } catch (CannotGetJdbcConnectionException e) {
            LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }
版本管理

/nacos-config/src/main/java/com/alibaba/nacos/config/server/controller/HistoryController.java

HistoryController

版本管理的主要功能是要将配置信息的变更过程全部以历史表的形式记录下来。

/**
 * History management controller.
 *
 * @author Nacos
 */
@RestController
@RequestMapping(Constants.HISTORY_CONTROLLER_PATH)
public class HistoryController {
    
    @Autowired
    protected PersistService persistService;
    
    @GetMapping(params = "search=accurate")
    public Page<ConfigHistoryInfo> listConfigHistory(@RequestParam("dataId") String dataId, //
            @RequestParam("group") String group, //
            @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
            @RequestParam(value = "appName", required = false) String appName,
            @RequestParam(value = "pageNo", required = false) Integer pageNo,
            //
            @RequestParam(value = "pageSize", required = false) Integer pageSize, //
            ModelMap modelMap) {
        pageNo = null == pageNo ? 1 : pageNo;
        pageSize = null == pageSize ? 100 : pageSize;
        pageSize = Math.min(500, pageSize);
        // configInfoBase has no appName field. configInfoBase 没有 appName 字段
        return persistService.findConfigHistory(dataId, group, tenant, pageNo, pageSize);
    }
    
    /**
     * Query the detailed configuration history information.
     * 查看配置历史信息详情
     * @param nid history_config_info nid
     * @return history config info
     */
    @GetMapping
    public ConfigHistoryInfo getConfigHistoryInfo(@RequestParam("nid") Long nid) {
        return persistService.detailConfigHistory(nid);
    }
    
    /**
     * Query previous config history information.
     * 根据 id 查看配置历史信息详情
     * @param id config_info id
     * @return history config info
     * @since 1.4.0
     */
    @GetMapping(value = "/previous")
    public ConfigHistoryInfo getPreviousConfigHistoryInfo(@RequestParam("id") Long id) {
        return persistService.detailPreviousConfigHistory(id);
    }
    
}

配置历史信息表的数据插入是在调用 PersistService.addConfigInfo 或者 PersistService.updateConfigInfo 去更新配置信息时内部执行的。

    @Override
    public void insertConfigHistoryAtomic(long id, ConfigInfo configInfo, String srcIp, String srcUser,
            final Timestamp time, String ops) {
        String appNameTmp = StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
        String tenantTmp = StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
        final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
        try {
            jt.update(
                    "INSERT INTO his_config_info (id,data_id,group_id,tenant_id,app_name,content,md5,src_ip,src_user,gmt_modified,op_type) "
                            + "VALUES(?,?,?,?,?,?,?,?,?,?,?)", id, configInfo.getDataId(), configInfo.getGroup(),
                    tenantTmp, appNameTmp, configInfo.getContent(), md5Tmp, srcIp, srcUser, time, ops);
        } catch (DataAccessException e) {
            LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);
            throw e;
        }
    }
灰度发布

灰度发布主要是在配置信息发布或者更新时,考虑到对系统的影响,所以先只针对一部分用户进行生效,等待实际测试确认后,再正式发布。

Nacos 配置管理中主要通过 IP 名单实现(publishConfig方法中)。

String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
			...
        } else { 
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
            EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }

如果发布时,提交了 betaIps 的话,会进入到 else 分支,而在 persistService.insertOrUpdateBeta 方法中,数据会被插入到 config_info_beta 这个表中;如果非灰度模式,数据被插入到 config_info 表中(见发布配置的代码段)。

在 获取配置的方法ConfigServletInner.doGetConfig 中会根据 ip 判断,如果是在beta灰度列表中的,则会变成灰度读取模式。具体读取的时候,如果是beta灰度模式,则会调用 persistService.findConfigInfo4Betaconfig_info_beta 表中读取数据。

if (isBeta) {
                    md5 = cacheItem.getMd54Beta();
                    lastModified = cacheItem.getLastModifiedTs4Beta();
                    if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
                        configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
                    } else {
                        file = DiskUtil.targetBetaFile(dataId, group, tenant);
                    }
                    response.setHeader("isBeta", "true");
                }
监听管理

监听管理主要是提供了配置订阅者即监听者查询能力,同时提供客户端当前配置的MD5校验值,以便帮助用户更好的检查配置变更是否推送到 Client 端。数据的监听管理是在 ConfigController 控制器中的 getListeners 方法。

    /**
     * Subscribe to configured client information. 订阅改配置的客户端信息
     */
    @GetMapping("/listener")
    @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
    public GroupkeyListenserStatus getListeners(@RequestParam("dataId") String dataId,
            @RequestParam("group") String group, @RequestParam(value = "tenant", required = false) String tenant,
            @RequestParam(value = "sampleTime", required = false, defaultValue = "1") int sampleTime) throws Exception {
        group = StringUtils.isBlank(group) ? Constants.DEFAULT_GROUP : group;
        SampleResult collectSampleResult = configSubService.getCollectSampleResult(dataId, group, tenant, sampleTime);
        GroupkeyListenserStatus gls = new GroupkeyListenserStatus();
        gls.setCollectStatus(200);
        if (collectSampleResult.getLisentersGroupkeyStatus() != null) {
            gls.setLisentersGroupkeyStatus(collectSampleResult.getLisentersGroupkeyStatus());
        }
        return gls;
    }

getListeners 方法调用了 getCollectSampleResult 方法,该方法将调用每个节点的监听者获取接口,即 Constants.COMMUNICATION_CONTROLLER_PATH + "/configWatchers" ,然后将所有结果合并起来。

在每个节点 CommunicationController 中的 configWatchers 接口中,调用了 longPollingService.getCollectSubscribleInfo 方法来获取本机的订阅者信息。在 getCollectSubscribleInfo 中又调用了getSubscribleInfo 方法。以下是 getSubscribleInfo 的实现。

    public SampleResult getSubscribleInfo(String dataId, String group, String tenant) {
        String groupKey = GroupKey.getKeyTenant(dataId, group, tenant);
        SampleResult sampleResult = new SampleResult();
        Map<String, String> lisentersGroupkeyStatus = new HashMap<String, String>(50);
        
        for (ClientLongPolling clientLongPolling : allSubs) {
            if (clientLongPolling.clientMd5Map.containsKey(groupKey)) {
                lisentersGroupkeyStatus.put(clientLongPolling.ip, clientLongPolling.clientMd5Map.get(groupKey));
            }
        }
        sampleResult.setLisentersGroupkeyStatus(lisentersGroupkeyStatus);
        return sampleResult;
    }

说明:allSubs 是一个长轮询的队列,异步用定时任务的方式来实时维护 allSubs 的状态,也就是当前本机所有监听者的数据,以上代码主要是将 allSubs 的数据组装到 Map 中进行返回。

推送轨迹

Nacos Config 采用了日志的方法保存了持久化、推送、拉取等动作的记录。

/nacos-config/src/main/java/com/alibaba/nacos/config/server/service/trace/ConfigTraceService.java

ConfigTraceService

功能建模

配置中心最基础的功能就是存储一个键值对,用户发布一个配置(ConfigKey),然后客户端获取这个配置项(ConfigValue);进阶的功能就是当某个配置项发生变更时,将变更告知客户端刷新旧值。

下方的架构图,简要描述了一个配置中心的大致架构,用户可以通过管理平台发布配置,通过 HTTP 调用将配置注册到服务端,服务端将之保存在 MySQL 等持久化存储引擎中;用户通过客户端 SDK 访问服务端的配置,同时建立 HTTP 的长轮询监听配置项变更,同时为了减轻服务端压力和保证容灾特性,配置项拉取到客户端之后会保存一份快照在本地文件中,SDK 优先读取文件里的内容。

config client & server

这里省略了许多细节问题,例如配置分层设计,权限校验,客户端长轮询的间隔设置,服务端每次查询都需要访问 MySQL 么,配置变更是主动推送还是等定时轮询触发等,

下面我们通过对于交互流程的分析来重点看看 Nacos 的配置中心是如何实现配置更新的。

核心流程设计分析

客户端 Client

一个完整的客户端实例

在 Nacos 的源代码中,子项目 example 中有一个示例类 ConfigExample

  • 实例化 configService(第 9 行)

  • 获取配置:configService 获取配置 (getConfig) (SDK 层面看是拉配置) (第 10 行)

    ​ 通常情况下该方法直接从本地文件中取得配置的值,如果本地文件不存在或者内容为空,则再通过 HTTP GET 方法从远端拉取配置,并保存到本地快照中。当通过 HTTP 获取远端配置时,Nacos 提供了两种熔断策略,一是超时时间,二是最大重试次数,默认重试三次。

  • 配置监听:configService 调用 addListener 方法监听服务端的变化 (SDK 层面看是服务端在推送配置) (第 12 行)

public class ConfigExample {
    public static void main(String[] args) throws NacosException, InterruptedException {
        String serverAddr = "localhost";
        String dataId = "test";
        String group = "DEFAULT_GROUP";
        Properties properties = new Properties();
        properties.put("serverAddr", serverAddr);
        ConfigService configService = NacosFactory.createConfigService(properties);
        String content = configService.getConfig(dataId, group, 5000);
        System.out.println(content);
        configService.addListener(dataId, group, new Listener() {
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("receive:" + configInfo);
            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        });
        ...
    }
}

下面我们详细分析一下各个步骤:

ConfigService 实例化

NacosFactory 创建 ConfigService 对象(工厂模式)
public class NacosFactory {
    /**
     * Create config service.
     *
     * @param properties init param
     * @return config
     * @throws NacosException Exception
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        return ConfigFactory.createConfigService(properties);
    }
    
    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        return ConfigFactory.createConfigService(serverAddr);
    }
    ...
    // Create naming service.
    // Create maintain service.
}

ConfigFactory 利用反射调用 NacosConfigService 有参构造的方法实例化了 ConfigService

  1. 通过 Class.forName 得到 NacosConfigService 的Class对象
  2. 拿到以Properties为参数的构造方法
  3. 通过反射创建实例对象并返回
public class ConfigFactory {
    
    /**
     * Create Config.
     *
     * @param properties init param
     * @return ConfigService
     * @throws NacosException Exception
     */
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }
    
    /**
     * Create Config.
     *
     * @param serverAddr serverList
     * @return Config
     * @throws ConfigService Exception
     */
    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return createConfigService(properties);
    }
}

以上构造过程如下图所示:

create ConfigService
NacosConfigService

Nacos 给客户端提供的 API ,拥有ConfigService 接口的所有方法。

  1. 配置检查
  2. 设置编码格式
  3. 初始化 namespace
  4. 构造 HTTP 代理并 start()
  5. 创建 ClientWorker

其构造函数如下:

public NacosConfigService(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
    if (StringUtils.isBlank(encodeTmp)) {
        this.encode = Constants.ENCODE;
    } else {
        this.encode = encodeTmp.trim();
    }
    initNamespace(properties);

    // ServerHttpAgent http代理
    // MetricsHttpAgent 又代理了一次
    // 另外,屏蔽了集群逻辑。提供的方法只是统一的http调用。    
    this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
    this.agent.start();
    this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}

初始化了一个 MetricsHttpAgent 和一个 ClientWorker。其中,agent 实际上是在 ClientWorker 中发挥作用的。

NacosConfigService do

MetricsHttpAgent(代理模式)

主要是用来记录一些 Metric 信息。内部持有一个ServerHttpAgent对象,是实际工作的类,MetricsHttpAgent 在内部也是调用了 ServerHttpAgent 的方法,另外加上了一些统计操作。实际上就是对 ServerHttpAgent 进行代理,包装了一层 prometheus 用于指标采集。

ServerHttpAgent

封装了一些 HTTP 交互的通用方法,比如GETPUTDELETE等方法,主要是针对时间数据采集。在 ServerHttpAgent 构造函数中,主要做了下面几件事情:

ServerHttpAgent
  1. 构造集群管理类 ServerListManager 对象 serverListMgr,这个对象的主要作用就是根据 Properties 解析出服务端的地址,然后维护在一个List<String>
  2. 构造安全代理类 SecurityProxy 、初始化命名空间
  3. init 方法中主要做了三件事情:初始化编码格式,如果没有就默认 UTF-8;初始化 accessKeysecretKey,这个应该是用来验证客户端的身份的;初始化重试次数 maxRetry,如果没传默认为 3
  4. 通过 SecurityProxy 进行登录认证,然后启动个任务,每隔5秒重新调用 login 接口进行认证
    public ServerHttpAgent(Properties properties) throws NacosException {
        // 集群管理类
        this.serverListMgr = new ServerListManager(properties);
        // 安全代理类
        this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE);
        // 命名空间
        this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE);
    	// 初始化配置 encoding、maxRetry、accesskey、secretkey
    	// accesskey、secretkey在登录认证时未用到。发送get、post、delete请求时需要验证
        init(properties);
        // 登录认证
        this.securityProxy.login(this.serverListMgr.getServerUrls());
        
        // init executorService daemon线程
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.config.security.updater");
                t.setDaemon(true);
                return t;
            }
        });
        
        // 5秒一次的登录认证(会校验是否在token窗口内,如果不在,则重新获取token),刷新token和token窗口
        this.executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                securityProxy.login(serverListMgr.getServerUrls());
            }
        }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);   
    }
功能
  1. 初始化 ServerHttpAgent 对象后,会进行和服务端的认证通信,并启动一个定时任务动态更新 token 保证 token 不失效。
  2. 调用 ServerHttpAgentstart 方法,主要是监控服务列表是否有变更,当发现有变更时发布 ServerlistChangeEvent 事件。

核心类 ClientWorker

封装了客户端从服务端主动 pull 配置信息的关键逻辑。注意这里很明确指出了 ClientWorker 是通过 pull模式 从服务端获取配置信息的,而在使用的时候通常会给它添加 Listener

    public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        // HTTP代理
        this.agent = agent;
        // 过滤
        this.configFilterChainManager = configFilterChainManager;
        
        // Initialize the timeout parameter 初始化配置  
        init(properties);
		// 初始化线程池
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        // 初始化长轮询的拉取线程
    	// cpu核数的线程,用来做长轮询的,每次检查配置,
        // 如果LongPollingRunnable任务的配置缓存超过一定数量,默认3000个,就要去开启一个新任务去检查配置
    	// Runtime.getRuntime().availableProcessors()获取了cpu核数
        this.executorService = Executors
               .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        // 定时任务,每隔10ms调用一次checkConfigInfo()方法
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }
  1. init(properties)方法主要就是初始化一些 timeout 的参数
  2. executor线程池里面只有一个线程,它的唯一作用就是让另一个线程池开始执行,每10s中执行一次
  3. executorService线程池用于更新配置信息,核心任务是长轮询 LongPollingRunnable
checkConfigInfo

更新配置信息,目前已经获取到的配置信息会缓存到一个Map<String, CacheData>中,然后对map中的数据分批次,每个批次的数据对应一个线程负责更新,如下:

    public void checkConfigInfo() {
        // Dispatch taskes.
        // 监听数目
        int listenerSize = cacheMap.size();
        // Round up the longingTaskCount. 监听数量/3000 向上取整
        // ParamUtil.getPerTaskConfigSize()默认是3000
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // The task list is no order.So it maybe has issues when changing.
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
checkConfigInfo
LongPollingRunnable

执行长轮询,检查服务端哪些 DataId 发生了变化,然后通过得到的 DataId 到 Server 端查询配置信息,更新到 CacheData 中。

  1. 检查本地配置,忽略本地快照不存在的配置项,检查是否存在需要回调监听器的配置项
  2. 如果本地没有配置项的,从服务端拿,返回配置内容发生变更的键值列表
  3. 每个键值再到服务端获取最新配置,更新本地快照,补全之前缺失的配置
  4. 检查 MD5 标签是否一致,不一致需要回调监听器
检查本地

首先取出与该 taskId 相关的 CacheData,然后对 CacheData 进行检查,包括本地配置检查和监听器的 md5 检查。

        public void run() {

            List<CacheData> cacheDatas = new ArrayList<CacheData>();
            //是否是在初始化的CacheData,会影响服务器是否挂起或者立即返回
            List<String> inInitializingCacheList = new ArrayList<String>();
            try {
                // check failover config
                for (CacheData cacheData : cacheMap.get().values()) {
                    // 是否属于当前长轮询任务的
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            checkLocalConfig(cacheData);
                            //用本地配置
                            if (cacheData.isUseLocalConfigInfo()) {
                                //有改变的话会通知监听者
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                            LOGGER.error("get local config info error", e);
                        }
                    }
                }
			
            ...

            } catch (Throwable e) {

                LOGGER.error("longPolling error : ", e);
                executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
            }
        }
    }
检查服务端

通过 checkUpdateDataIds() 方法从服务端获取那些值发生了变化的 DataId 列表。如果有的话,那么就通过 getServerConfig 方法,根据 DataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。

// check server config 获取有变化的配置列表dataid+group,访问的url是/listener
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
    LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}

// 轮询有配置改变的,然后去获取内容
for (String groupKey : changedGroupKeys) {
    String[] key = GroupKey.parseKey(groupKey);
    String dataId = key[0];
    String group = key[1];
    String tenant = null;
    if (key.length == 3) {
        tenant = key[2];
    }
    try {
        // 有更新的就获取一次配置
        String[] ct = getServerConfig(dataId, group, tenant, 3000L);
        CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
        // 设置配置内容
        cache.setContent(ct[0]);
        if (null != ct[1]) {
            // 设置配置类型
            cache.setType(ct[1]);
        }
        LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                agent.getName(), dataId, group, tenant, cache.getMd5(),
                ContentUtils.truncateContent(ct[0]), ct[1]);
    } catch (NacosException ioe) {
        String message = String
                .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                        agent.getName(), dataId, group, tenant);
        LOGGER.error(message, ioe);
    }
}
// 不是初始化中的或者初始化集合里存在的
for (CacheData cacheData : cacheDatas) {
    if (!cacheData.isInitializing() || inInitializingCacheList
            .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
        // 检查是否有变化,有变化就通知
        cacheData.checkListenerMd5();
        // 请求过了后就设置为不在初始化中,这样就会被挂起,如果服务器配置有更新,就会立即返回,
        // 这样就可以实现动态配置更新,又不会太多的空轮询消耗
        cacheData.setInitializing(false);
    }
}
inInitializingCacheList.clear();

executorService.execute(this);

注意在第 21 行 setcontent 方法中,更新了 md5 的值。

Listener

从上面的分析中我们可以看到,在长轮询的任务中,当服务端配置信息发生变更时,客户端将最新的数据获取下来之后,保存在了 CacheData 中,同时更新了该 CacheData 的 md5 值。所以当下次执行 checkListenerMd5 方法时,就会发现当前 Listener 所持有的 md5 值已经和 CacheData 的 md5 值不一样了,也就意味着服务端的配置信息发生改变了,这时就需要将最新的数据通知给 Listener 的持有者。

添加 Listener

首先根据 dataIdGroup 和当前的场景获取一个 CacheData 对象,然后将当前要添加的 Listener 对象添加到 CacheData 中去。

// NacosConfigService#addListener
public void addListener(String dataId, String group, Listener listener) throws NacosException {
    worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}

// ClientWorker#addTenantListeners
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException {
    group = null2defaultGroup(group);
    String tenant = agent.getTenant();
    CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
    for (Listener listener : listeners) {
        cache.addListener(listener);
    }
}
CacheData

listeners 是该 CacheData 所关联的所有 Listener,不过不是保存的原始的 Listener 对象,而是包装后的 ManagerListenerWrap 对象,该对象除了持有 Listener 对象,还持有了一个 lastCallMd5 属性。

public class CacheData {
    
    private static final Logger LOGGER = LogUtils.logger(CacheData.class);

    private final String name;
    
    private final ConfigFilterChainManager configFilterChainManager;
    
    public final String dataId;
    
    public final String group;
    
    public final String tenant;
    
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    
    private volatile String md5;
    
    /**
     * whether use local config.
     */
    private volatile boolean isUseLocalConfig = false;
    
    /**
     * last modify time.
     */
    private volatile long localConfigLastModified;
    
    private volatile String content;
    
    private int taskId;
    
    private volatile boolean isInitializing = true;
    
    private String type; 
    
    ...
    }
}

下图中,content 是配置内容,MD5 值是用来检测配置是否发生变更的关键,内部还维护着一个若干监听器组成的数组,一旦发生变更则依次回调这些监听器。

CacheData
触发回调

checkListenerMd5 方法会检查 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就执行一个安全的监听器的通知方法:safeNotifyListener

checkListenerMd5

如果 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值不一致,则执行一个安全的通知 Listener 的方法。

    void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, wrap);
            }
        }
    }
safeNotifyListener

safeNotifyListener 方法中,它获取最新的配置信息(第 24 行),调用 Listener 的回调方法(第 25 行),将最新的配置信息作为参数传入(第 26 行),这样 Listener 的使用者就能接收到变更后的配置信息了,最后更新 ListenerWrap 的 md5 值。

private void safeNotifyListener(final String dataId, final String group, final String content,
                                final String md5, final ManagerListenerWrap listenerWrap) {
    final Listener listener = listenerWrap.listener;
 
    Runnable job = new Runnable() {
        @Override
        public void run() {
            ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
            ClassLoader appClassLoader = listener.getClass().getClassLoader();
            try {
                if (listener instanceof AbstractSharedListener) {
                    AbstractSharedListener adapter = (AbstractSharedListener) listener;
                    adapter.fillContext(dataId, group);
                    LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                }
                // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                Thread.currentThread().setContextClassLoader(appClassLoader);
 
                ConfigResponse cr = new ConfigResponse();
                cr.setDataId(dataId);
                cr.setGroup(group);
                cr.setContent(content);
                configFilterChainManager.doFilter(null, cr);
                String contentTmp = cr.getContent();
                listener.receiveConfigInfo(contentTmp);
                listenerWrap.lastCallMd5 = md5;
                LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                    listener);
            } catch (NacosException de) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,
                    dataId, group, md5, listener, de.getErrCode(), de.getErrMsg());
            } catch (Throwable t) {
                LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group,
                    md5, listener, t.getCause());
            } finally {
                Thread.currentThread().setContextClassLoader(myClassLoader);
            }
        }
    };
 
    final long startNotify = System.currentTimeMillis();
    try {
        if (null != listener.getExecutor()) {
            listener.getExecutor().execute(job);
        } else {
            job.run();
        }
    } catch (Throwable t) {
        LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group,
            md5, listener, t.getCause());
    }
    final long finishNotify = System.currentTimeMillis();
    LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
        name, (finishNotify - startNotify), dataId, group, md5, listener);
}

客户端小结

当 Nacos 服务端创建了相关的配置项后,客户端就可以进行监听了。客户端通过一个定时的后台进程来检查自己监听的配置项的数据的,并从服务端查询变化的数据对应的 dataId 列表,一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据根据 dataId 列表保存到CacheData 中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发回调。同时启动一个后台线程,校验内存中缓存的数据是否和缓存文件中的一致,并进行相应处理,检查监听的数据是否和内存中的一致,若不一致则回调监听。

服务端 Server

核心类

PersistService - 数据库访问服务,封装了所有SQL操作

ConfigService - 本地文件 & 内存Cache 数据结构维护

DumpService - 通过 PersistService & ConfigService 将配置信息 dump 到本地

AsyncNotifyService - 配置入库之后,响应 ConfigDataChangeEvent 发起 dataChange 调用,触发 dump 操作

LongPollingService - 管理长轮询 Client,主要利用了 Servlet 3.0 中的 AsyncContenxt 机制

ConfigController - 处理 Config 获取/订阅/变更相关的 HTTP 请求

CommunicationController - 处理数据同步(dump)请求

![Config server class](nacos.assets/Config server class.jpg)

配置 Dump

服务端启动时就会依赖 DumpServiceinit 方法,从数据库中 load 配置存储在本地磁盘上,并将一些重要的元信息例如 MD5 值缓存在内存中。服务端会根据心跳文件中保存的最后一次心跳时间,来判断到底是从数据库 dump 全量配置数据还是部分增量配置数据(如果机器上次心跳间隔是 6h 以内的话)。

全量 dump 当然先清空磁盘缓存,然后根据主键 ID 每次捞取一千条配置刷进磁盘和内存。增量 dump 就是捞取最近六小时的新增配置(包括更新的和删除的),先按照这批数据刷新一遍内存和文件,再根据内存里所有的数据全量去比对一遍数据库,如果有改变的再同步一次,相比于全量 dump 的话会减少一定的数据库 IO 和磁盘 IO 次数。

配置注册

Nacos 服务端是一个 SpringBoot 实现的服务,注册配置主要代码位于 ConfigControllerConfigServletInner 中。服务端一般是多节点部署的集群,因此请求一开始只会打到一台机器,这台机器将配置插入 MySQL 中进行持久化。

因为服务端并不是针对每次配置查询都去访问 MySQL 的,而是会依赖 dump 功能在本地文件中将配置缓存起来。因此当单台机器保存完毕配置之后,需要通知其他机器刷新内存和本地磁盘中的文件内容,因此它会发布一个名为 ConfigDataChangeEvent 的事件,这个事件会通过 HTTP 调用通知所有集群节点(包括自身),触发本地文件和内存的刷新。

server register

工作机制

Server支持两种形式的长轮询机制:

  • 固定间隔轮训(Fixed Polling)- 每次长轮询的时间是固定的,在固定间隔轮询时间结束后,计算期间发生的配置变更,返回group keys。

  • 非固定间隔轮训 - 长轮询时间不固定,一旦订阅的配置反生变更就立即返回 group key。

Fixed Polling机制性能更好,但实时性较差。Unfixed Polling机制实时性好,但http请求数明显更多,性能较差。

工作流程

客户端的请求到达服务端后,服务端将该请求加入到一个叫 allSubs 的队列中,等待配置发生变更时 DataChangeTask 主动去触发,并将变更后的数据写入响应对象。

与此同时服务端也将该请求封装成一个调度任务去执行,等待调度的期间就是等待 DataChangeTask 主动触发的,如果延迟时间到了 DataChangeTask 还未触发的话,则调度任务开始执行数据变更的检查,然后将检查的结果写入响应对象。

轮询接口
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
            Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
        
        // Long polling.
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
        
        // Compatible with short polling logic.
        List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
        
        // Compatible with short polling result.
        String oldResult = MD5Util.compareMd5OldResult(changedGroups);
        String newResult = MD5Util.compareMd5ResultString(changedGroups);
        
        String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
        if (version == null) {
            version = "2.0.0";
        }
        int versionNum = Protocol.getVersionNumber(version);
        
        // Befor 2.0.4 version, return value is put into header.
        if (versionNum < START_LONG_POLLING_VERSION_NUM) {
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
            response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
        } else {
            request.setAttribute("content", newResult);
        }
        
        Loggers.AUTH.info("new content:" + newResult);
        
        // Disable cache.
        response.setHeader("Pragma", "no-cache");
        response.setDateHeader("Expires", 0);
        response.setHeader("Cache-Control", "no-cache,no-store");
        response.setStatus(HttpServletResponse.SC_OK);
        return HttpServletResponse.SC_OK + "";
    }

进入 longPollingServiceaddLongPollingClient 方法。

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
            int probeRequestSize) {

        String str = req.getHeader(
            LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);

        // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // Do nothing but set fix polling timeout.
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                        RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                        changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);

        // Must be called by http thread, or send response.
        final AsyncContext asyncContext = req.startAsync();

        // AsyncContext.setTimeout() is incorrect, Control by oneself
        asyncContext.setTimeout(0L);

        ConfigExecutor.executeLongPolling(
                new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

服务端将客户端的长轮询请求封装成一个叫 ClientLongPolling 的任务,交给 scheduler 去执行该任务持有一个 AsyncContext 响应对象(Servlet 3.0 的新机制),通过定时线程池延后 29.5s 执行。

为什么比客户端 30s 的超时时间提前 500ms 返回是为了最大程度上保证客户端不会因为网络延时造成超时

    /**
     * ClientLongPolling subscibers.
     */
    final Queue<ClientLongPolling> allSubs;
class ClientLongPolling implements Runnable {

        @Override
        public void run() {
            asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
                @Override
                public void run() {
                    try {
                        getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());

                        // Delete subsciber's relations.
                        allSubs.remove(ClientLongPolling.this);

                        if (isFixedPolling()) {
                            LogUtil.CLIENT_LOG
                                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix",
                                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                            "polling", clientMd5Map.size(), probeRequestSize);
                            List<String> changedGroups = MD5Util
                                    .compareMd5((HttpServletRequest) asyncContext.getRequest(),
                                            (HttpServletResponse) asyncContext.getResponse(), clientMd5Map);
                            if (changedGroups.size() > 0) {
                                sendResponse(changedGroups);
                            } else {
                                sendResponse(null);
                            }
                        } else {
                            LogUtil.CLIENT_LOG
                                    .info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
                                            RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
                                            "polling", clientMd5Map.size(), probeRequestSize);
                            sendResponse(null);
                        }
                    } catch (Throwable t) {
                        LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
                    }

                }

            }, timeoutTime, TimeUnit.MILLISECONDS);

            allSubs.add(this);
        }

allSubs 是一个 ConcurrentLinkedQueue 队列,可以把allSubs当作是一个订阅者列表,其保存了所有正在被夯住的轮询请求。这是因为在配置项被夯住的期间内,如果用户通过管理平台操作了配置项变更、或者服务端该节点收到了来自其他节点的 dump 刷新通知,那么都应立即取消夯住的任务,及时通知客户端数据发生了变更。具体描述如下:当服务端在请求被夯住的期间接收到某项配置变更时,就会发布一个 LocalDataChangeEvent 类型的事件通知,之后会将这个变更包装成一个 DataChangeTask 异步执行,内容就是从 allSubs 中找出夯住的 ClientLongPolling 请求,写入变更强制其立即返回。

ClientLongPolling 被提交给 scheduler 执行之后,实际执行的内容可以拆分成以下四个步骤:

  • 1.创建一个调度的任务,把一个任务延迟了 timeoutTime 29.5s
  • 2.将该 ClientLongPolling 自身的实例添加到一个 allSubs 中去
  • 3.延时时间到了之后,首先将该 ClientLongPolling 自身的实例从 allSubs 中移除
  • 4.通过 AsyncContext 获取服务端中保存的对应客户端请求的 groupKeys 是否发生变更,将结果写入 response 返回给客户端

timeoutTime 超时时间到了之后,服务端会直接结束本次请求;然后客户端又会立马重新发起新一轮请求,重复这个过程;相当于是说客户端每隔 timeoutTime 时间之后,就发起一次请求判断服务端是否有变更数据。该过程得示意图如下:

config sever workflow
配置变更

修改配置后,服务端首先将配置的值进行了持久化层的更新,然后触发了一个 ConfigDataChangeEvent 的事件。

    public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,

        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        configInfo.setType(type);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
// 持久化
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
// 触发事件
                ConfigChangePublisher
                        .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
                persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
                ConfigChangePublisher.notifyConfigChange(
                        new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else {
            // beta publish
            persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService
                .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
                        ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

执行了一个叫 DataChangeTask 的任务,应该是通过该任务来通知客户端服务端的数据已经发生了变更。

  class DataChangeTask implements Runnable {
        @Override
        public void run() {
            try {
                for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        iter.remove(); // 删除订阅关系
                      clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
            }
        }
  1. 遍历的allSubs(所有的订阅者数组);
  2. 比较客户端订阅的配置数据MD5与当前是否一致,这个时候基本是不一致的,因为有修改
  3. 做了一些过滤操作之后,sendResponse(Arrays.asList(groupKey)) 这个操作,就是向客户端发送已经变更的配置项;发送了之后,本次请求也就结束了;客户端又会重新再发起新的一轮请求。

服务端小结

  1. 客户端发起订阅请求
  2. 服务端接收到请求之后,立马去查询一次数据是否变更 ① 如果有变更立马返回,然后客户端又回到步骤1 ② 如果没有变更,则把当前请求hold住一定的时间(默认29.5s)
  3. ① 如果这期间客户端所监听的数据都一直没有变更,在时间到达之后,结束客户端的本次请求,客户端又回到步骤1 ② 如果期间有变更,服务端会轮询所有监听了这个变更了配置项的客户端,然后将结果写入 response 中,立马返回响应变更了的配置项,本次请求也结束,客户端又回到步骤1
nacos-client-request

Nacos Config 配置更新小结

基于上述的分析,最终总结了以下结论:

  1. Nacos 客户端会循环请求服务端变更的数据,并且超时时间设置为30s,当配置发生变化时,请求的响应会立即返回,否则会一直等到 29.5s+ 之后再返回响应
  2. Nacos 客户端能够实时感知到服务端配置发生了变化
  3. 实时感知是建立在客户端拉和服务端“推”的基础上,但是这里的服务端“推”需要打上引号,因为服务端和客户端直接本质上还是通过 http 进行数据通讯的,之所以有“推”的感觉,是因为服务端主动将变更后的数据通过 http 的 response 对象提前写入了。
推 or 拉?

Nacos Config 并不是通过推的方式将服务端最新的配置信息发送给客户端的,而是客户端维护了一个长轮询的任务,定时去拉取发生变更的配置信息,然后将最新的数据推送给 Listener 的持有者。

拉的优势

客户端拉取服务端的数据与服务端推送数据给客户端相比,优势在哪呢,为什么 Nacos 不设计成主动推送数据,而是要客户端去拉取呢?如果用推的方式,服务端需要维持与客户端的长连接,这样的话需要耗费大量的资源,并且还需要考虑连接的有效性,例如需要通过心跳来维持两者之间的连接。而用拉的方式,客户端只需要通过一个无状态的 http 请求即可获取到服务端的数据。

高级设计意图分析

设计模式

Nacos Config 中用到的设计模式有单例模式、工厂模式和代理模式等。

单例模式 Singleton Pattern

单例模式简介

单例模式属于创建型模式,它提供了一种创建对象的最佳方式。这种模式涉及到一个单一的类,该类负责创建自己的对象,同时确保只有单个对象被创建。这个类提供了一种访问其唯一的对象的方式,可以直接访问,不需要实例化该类的对象。

**意图:**保证一个类仅有一个实例,并提供一个访问它的全局访问点。

**主要解决:**一个全局使用的类频繁地创建与销毁。

注意:

  • 1、单例类只能有一个实例。
  • 2、单例类必须自己创建自己的唯一实例。
  • 3、单例类必须给所有其他对象提供这一实例。
Nacos Config 中的单例模式

在客户端中,ConfigService有唯一实现类NacosConfigService,且在当前版本建构中,只允许实例化一个配置服务 ConfigService。作为一个客户端全局使用的类,这样可以避免配置服务ConfigService 被频繁地创建与销毁,从而节省内存。

NacosConfigService

工厂模式 Factory Pattern

工厂模式简介

工厂模式属于创建型模式,它提供了一种创建对象的最佳方式。在工厂模式中,我们在创建对象时不会对客户端暴露创建逻辑,并且是通过使用一个共同的接口来指向新创建的对象。

**意图:**定义一个创建对象的接口,让其子类自己决定实例化哪一个工厂类,工厂模式使其创建过程延迟到子类进行。

**主要解决:**主要解决接口选择的问题。

**何时使用:**我们明确地计划不同条件下创建不同实例时。

**如何解决:**让其子类实现工厂接口,返回的也是一个抽象的产品。

**关键代码:**创建过程在其子类执行。

优点:

  1. 一个调用者想创建一个对象,只要知道其名称就可以了。
  2. 扩展性高,如果想增加一个产品,只要扩展一个工厂类就可以。
  3. 屏蔽产品的具体实现,调用者只关心产品的接口。
Nacos Config 中的工厂模式

从下方 Nacos 官方所给出的 Nacos-SDK 类视图中可以很清晰地看出 Nacos 中的工厂分为两级,顶层工厂 NacosFactory 包含了 Nacos 两个主要功能点服务发现与配置管理的工厂的创建(很显然这里官方给出的图出了一点小错误)。在 ConfigFactory 中,通过传递类型信息来获取了配置服务实体类的对象。

Nacos 中这样的设计将类的具体实现封装起来,调用者可以十分方便地根据所提供的接口创建对象,将对象的创建和使用的过程解耦,从而降低了代码的重复率和维护成本。

![Factory Pattern](nacos.assets/Factory Pattern.jpeg)

代理模式 Proxy Pattern

代理模式简介

代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用。通俗的来讲代理模式就是我们生活中常见的中介。

意图:为其他对象提供一种代理以控制对这个对象的访问。

主要解决:在直接访问对象时带来的问题,比如说:要访问的对象在远程的机器上。在面向对象系统中,有些对象由于某些 原因(比如对象创建开销很大,或者某些操作需要安全控制,或者需要进程外的访问),直接访问会给使用者或者系统结构带来很多麻烦,我们可以在访问此对象时加上一个对此对象的访问层

何时使用:想在访问一个类时加一些限制

如何解决:增加中间层

关键代码:实现与被代理类结合

Nacos Config 中的代理模式

在前述的流程分析中,已经提到了MetricsHttpAgent ServerHttpAgent进行静态代理。

this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));

这种代理模式让MetricsHttpAgent 在不改变目标对象ServerHttpAgent的前提下,还可以通过给代理类增加额外的功能来扩展委托类的功能,这样做我们只需要修改代理类而不需要再修改委托类,符合代码设计的开闭原则

MetricsHttpAgent

![ServerHttpAgent class](nacos.assets/ServerHttpAgent class.png)

代理类主要负责为委托类预处理消息、过滤消息、把消息转发给委托类,以及事后对返回结果的处理等。代理类本身并不真正实现服务,而是同过调用委托类的相关方法,来提供特定的服务。真正的业务功能还是由委托类来实现,但是可以在业务功能执行的前后加入一些公共的服务。

例如,在 Nacos 中,通过加入代理对象 MetricsHttpAgent 来屏蔽对真实对象 ServerHttpAgent 的直接访问。我认为主要是为了提高安全性和系统性能,所以对真实对象进行封装,从而达到延迟加载的目的,也就没有必要打开已经封装好的委托类。

以一个简单的示例来阐述使用代理模式实现延迟加载的方法及其意义。假设某客户端软件有根据用户请求去数据库查询数据的功能。在查询数据前,需要获得数据库连接,软件开启时初始化系统的所有类,此时尝试获得数据库连接。当系统有大量的类似操作存在时 (比如 XML 解析等),所有这些初始化操作的叠加会使得系统的启动速度变得非常缓慢。为此,使用代理模式的代理类封装对数据库查询中的初始化操作,当系统启动时,初始化这个代理类,而非真实的数据库查询类,而代理类什么都没有做。因此,它的构造是相当迅速的。

在系统启动时,将消耗资源最多的方法都使用代理模式分离,可以加快系统的启动速度,减少用户的等待时间。而在用户真正做查询操作时再由代理类单独去加载真实的数据库查询类,完成用户的请求。这个过程就是使用代理模式实现了延迟加载。

结语

Nacos Config 源码的阅读是我第一次阅读这种规模的项目,由于对 Spring Cloud 等一些相关内容并不熟悉,造成自己阅读源码还不是很深刻,只是粗浅地了解了一些配置管理的相关内容,知道不同的对象运用什么方法干了点什么,以后还需慢慢补课(填坑),争取能看懂的多一点(比如:注解等)。

感谢助教大大和各位读者的耐心阅读,在此祝大家新年大吉!

参考链接

  1. Nacos 文档
  2. Spring Cloud Alibaba Nacos Config Wiki
  3. nacos-config工作原理解析
  4. 微服务配置中心-Nacos
  5. Nacos配置中心源码解析
  6. 图文解析 Nacos 配置中心的实现

Comment ( 0 )

Sign in for post a comment