apollo--拉取配置

整体流程

image-20201108165818701

"

约定

  • 通知编号 = ReleaseMessage.id
  • Watch Key = ReleaseMessage.message

推送客户端发布消息

推送配置流程图

image-20201109103707448

"

ReleaseMessageScanner

com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner ,实现 org.springframework.beans.factory.InitializingBean 接口,ReleaseMessage 扫描器,被 Config Service 使用。

初始化 Scan 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Override
public void afterPropertiesSet() throws Exception {
databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
// 获得最大的 ReleaseMessage 的编号
maxIdScanned = loadLargestMessageId();
// 创建从 DB 中扫描 ReleaseMessage 表的定时任务
executorService.scheduleWithFixedDelay(() -> {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
// 扫描任务
scanMessages();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);

}

/**
* find largest message id as the current start point
* @return current largest message id
*/
private long loadLargestMessageId() {
ReleaseMessage releaseMessage = releaseMessageRepository.findTopByOrderByIdDesc();
return releaseMessage == null ? 0 : releaseMessage.getId();
}

/**
* Scan messages, continue scanning until there is no more messages
*/
private void scanMessages() {
boolean hasMoreMessages = true;
while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
hasMoreMessages = scanAndSendMessages();
}
}

/**
* scan messages and send
*
* @return whether there are more messages
*/
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}

fireMessageScanned

#fireMessageScanned(List<ReleaseMessage> messages) 方法,触发监听器,处理 ReleaseMessage 们。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) { // 循环 ReleaseMessage
for (ReleaseMessageListener listener : listeners) { // 循环 ReleaseMessageListener
try {
// 触发监听器
listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}

// 通过 #addMessageListener(ReleaseMessageListener) 方法,注册 ReleaseMessageListener 。在 MessageScannerConfiguration 中,调用该方法,初始化 ReleaseMessageScanner 的监听器们
@Configuration
static class MessageScannerConfiguration {
.......
@Bean
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
// 0. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
// 1. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
// 2. handle server cache
releaseMessageScanner.addMessageListener(configService);
releaseMessageScanner.addMessageListener(configFileController);
// 3. notify clients
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
}
}

NotificationControllerV2

com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2 ,实现 ReleaseMessageListener 接口,通知 Controller ,仅提供 notifications/v2 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
 @GetMapping
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;

// 反序列化客户端的通知信息
try {
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
} catch (Throwable ex) {
Tracer.logError(ex);
}

if (CollectionUtils.isEmpty(notifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}

// 过滤并创建 ApolloConfigNotification Map 。其中,KEY 为 Namespace 的名字
Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);

if (CollectionUtils.isEmpty(filteredNotifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}

// 创建DefferedResult对象
// 当一个请求到达API接口,如果该API接口的return返回值是DeferredResult,在没有超时或者
// DeferredResult对象设置setResult时,接口不会返回,但是Servlet容器线程会结束,
// 如此以来这个请求不会占用服务连接池太久,如果超时或设置setResult,接口会立即返回。
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
// 客户端所有的命名空间
Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
// 客户端所有命名空间以及通知id的对应关系
Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());

for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
String normalizedNamespace = notificationEntry.getKey();
ApolloConfigNotification notification = notificationEntry.getValue();
namespaces.add(normalizedNamespace);
clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
// 记录名字被格式化的 Namespace 。因为,最终返回给客户端,使用原始的 Namespace 名字,否则客户端无法识别。
deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
}
}

// Assemble Watch Key Multimap with namespace as the key and watch keys as the value
// 如果是关联类型的namespace,有2个watch key
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);

Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());

/**
* 1、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));

deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});

//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}

logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);

/**
* 2、check new release
* 最近的发布记录
*/
List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();

List<ApolloConfigNotification> newNotifications =
getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
latestReleaseMessages);

if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications);
}

return deferredResultWrapper.getResult();
}


/**
* 组装watch key
* Assemble watch keys for the given appId, cluster, namespaces, dataCenter combination
*
* @return a multimap with namespace as the key and watch keys as the value
*/
public Multimap<String, String> assembleAllWatchKeys(String appId, String clusterName,
Set<String> namespaces,
String dataCenter) {
Multimap<String, String> watchedKeysMap =
assembleWatchKeys(appId, clusterName, namespaces, dataCenter);

if (!(namespaces.size() == 1 && namespaces.contains(ConfigConsts.NAMESPACE_APPLICATION))) {
Set<String> namespacesBelongToAppId = namespacesBelongToAppId(appId, namespaces);
// 关联类型的namespace
Set<String> publicNamespaces = Sets.difference(namespaces, namespacesBelongToAppId);

//Listen on more namespaces if it's a public namespace
if (!publicNamespaces.isEmpty()) {
// 组装关联类型的watch key
watchedKeysMap
.putAll(findPublicConfigWatchKeys(appId, clusterName, publicNamespaces, dataCenter));
}
}

return watchedKeysMap;
}

// 根据最近发布记录,获取最新的通知消息;如果发布记录的id比客户端缓存的通知id大,那么就是最新的通知消息
private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
Map<String, Long> clientSideNotifications,
Multimap<String, String> watchedKeysMap,
List<ReleaseMessage> latestReleaseMessages) {
List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
Map<String, Long> latestNotifications = Maps.newHashMap();
for (ReleaseMessage releaseMessage : latestReleaseMessages) {
latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
}

for (String namespace : namespaces) {
// 客户端缓存的通知id
long clientSideId = clientSideNotifications.get(namespace);
// 最新的通知id
long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;
// 根据命名空间,得到该命名空间所关心的watch key。然后比较大小,得到这些watch key中最大的通知id
Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
for (String namespaceWatchedKey : namespaceWatchedKeys) {
long namespaceNotificationId =
latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);
if (namespaceNotificationId > latestId) {
latestId = namespaceNotificationId;
}
}
// 如果发布记录的id比客户端缓存的通知id大,那么就是最新的通知消息
if (latestId > clientSideId) {
ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);
namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->
notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));
newNotifications.add(notification);
}
}
}
return newNotifications;
}

客户端拉取配置

拉取配置流程图

image-20201111161110474

"

RemoteConfigRepository

com.ctrip.framework.apollo.internals.RemoteConfigRepository ,实现 AbstractConfigRepository 抽象类,远程配置 Repository 。实现从 Config Service 拉取配置,并缓存在内存中。并且,定时 + 实时刷新缓存。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public RemoteConfigRepository(String namespace) {
// 一个 RemoteConfigRepository 对应一个 Namespace
m_namespace = namespace;
// 缓存配置
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
// 远程配置长轮询服务
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
// 长轮询到通知的 Config Service 信息。在下一次轮询配置时,优先从该 Config Service 请求
m_longPollServiceDto = new AtomicReference<>();
// 缓存该命名空间的所有watch key,以及对应的通知id
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
// 是否强制拉取缓存的标记,若为 true ,则多一轮从 Config Service 拉取配置。
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 尝试同步配置,作为初次的配置缓存初始化。
this.trySync();
// 初始化定时刷新配置的任务
this.schedulePeriodicRefresh();
// 将自己注册到 RemoteConfigLongPollService 中,实现配置更新的实时通知
this.scheduleLongPollingRefresh();
}

// 尝试同步配置
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}

// 初始化定时刷新配置的任务
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}

// 将自己注册到 RemoteConfigLongPollService 中,实现配置更新的实时通知
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}

sync

#sync() 实现方法,从 Config Service 同步配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
 @Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");

try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();

//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}

if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}

transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}

/**
* 加载配置
*/
private ApolloConfig loadApolloConfig() {
// 限流
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
// 获得 appId cluster dataCenter 配置信息
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
// 计算重试次数
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;

// 获得所有的 Config Service 的地址
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
// 循环读取配置重试次数直到成功。每一次,都会循环所有的 ServiceDTO 数组。
for (int i = 0; i < maxRetries; i++) {
// 随机所有的 Config Service 的地址
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
// 优先访问通知配置变更的 Config Service 的地址。并且,获取到时,需要置空,避免重复优先访问。
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}

// 循环所有的 Config Service 的地址
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);

try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}

// 组装查询配置的地址
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());

logger.debug("Loading config from {}", url);

HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}

Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {

// http请求
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();

transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);

// 无新的配置,直接返回缓存的 ApolloConfig 对象
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get();
}

// 有新的配置,进行返回新的 ApolloConfig 对象
ApolloConfig result = response.getBody();

logger.debug("Loaded config for {}: {}", m_namespace, result);

return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
// 若返回的状态码是 404 ,说明查询配置的 Config Service 不存在该 Namespace
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
//
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}

// 计算延迟时间
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}

}
String message = String.format(
"Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
appId, cluster, m_namespace, url);
throw new ApolloConfigException(message, exception);
}

// 组装http请求参数
String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
String dataCenter, ApolloNotificationMessages remoteMessages, ApolloConfig previousConfig) {

String path = "configs/%s/%s/%s";
List<String> pathParams =
Lists.newArrayList(pathEscaper.escape(appId), pathEscaper.escape(cluster),
pathEscaper.escape(namespace));
Map<String, String> queryParams = Maps.newHashMap();

if (previousConfig != null) {
queryParams.put("releaseKey", queryParamEscaper.escape(previousConfig.getReleaseKey()));
}

if (!Strings.isNullOrEmpty(dataCenter)) {
queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
}

String localIp = m_configUtil.getLocalIp();
if (!Strings.isNullOrEmpty(localIp)) {
queryParams.put("ip", queryParamEscaper.escape(localIp));
}

if (remoteMessages != null) {
queryParams.put("messages", queryParamEscaper.escape(GSON.toJson(remoteMessages)));
}

String pathExpanded = String.format(path, pathParams.toArray());

if (!queryParams.isEmpty()) {
pathExpanded += "?" + MAP_JOINER.join(queryParams);
}
if (!uri.endsWith("/")) {
uri += "/";
}
return uri + pathExpanded;
}

RemoteConfigLongPollService

com.ctrip.framework.apollo.internals.RemoteConfigLongPollService ,远程配置长轮询服务。负责长轮询 Config Service 的配置变更通知 /notifications/v2 接口。当有新的通知时,触发 RemoteConfigRepository ,立即轮询 Config Service 的配置读取 /configs/{appId}/{clusterName}/{namespace:.+} 接口。

1
2
3
4
5
6
7
8
9
10
11
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
// 添加到 m_longPollNamespaces 中; m_longPollNamespaces:注册的长轮询的 Namespace Multimap 缓存
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
// 添加到 m_notifications 中,通知id默认-1; m_notifications:通知编号 Map 缓存
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
// 若未启动长轮询定时任务,进行启动
startLongPolling();
}
return added;
}

startLongPolling

#startLongPolling() 方法,启动长轮询任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
private void startLongPolling() {
// CAS 设置长轮询任务已经启动。若已经启动,不重复启动
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
// 获得 appId cluster dataCenter 配置信息
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
// 获得长轮询任务的初始化延迟时间,单位毫秒
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
// 提交长轮询任务。该任务会持续且循环执行
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
// 执行长轮询
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}

// 持续执行长轮询
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
// 循环执行,直到停止或线程中断
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
// 限流
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
// 获得 最近一次使用的Config Service 的地址,默认为null
if (lastServiceDto == null) {
// 获得所有的 Config Service 的地址
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}

// 组装长轮询通知变更的地址
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);

logger.debug("Long polling from {}", url);

// 创建 HttpRequest 对象,并设置超时时间:90s
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}

transaction.addData("Url", url);

// http请求
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);

logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);

// 有新的通知,刷新本地的缓存
if (response.getStatusCode() == 200 && response.getBody() != null) {
// 更新 m_notifications:namespace->通知id
updateNotifications(response.getBody());
// 更新 m_remoteNotificationMessages
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
// 通知对应的 RemoteConfigRepository 们
notify(lastServiceDto, response.getBody());
}

//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
// 无新的通知,重置连接的 Config Service 的地址,下次请求不同的 Config Service ,实现负载均衡。
lastServiceDto = null;
}

// 标记成功,会重置policy的等待时间为0
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
// 重置连接的 Config Service 的地址,下次请求不同的 Config Service
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
// 标记失败,计算下一次延迟执行时间,每次延迟的时间翻倍,一直到最大值
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
// 根据policy等待一定时间,下次失败重试
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}

长轮询到新的通知消息,会更新本地缓存,通知观察者(RemoteConfigRepository),触发监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
 //  更新通知消息中,命名空间对应的最新通知id 
private void updateNotifications(List<ApolloConfigNotification> deltaNotifications) {
for (ApolloConfigNotification notification : deltaNotifications) {
if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
continue;
}
String namespaceName = notification.getNamespaceName();
if (m_notifications.containsKey(namespaceName)) {
// 更新通知id
m_notifications.put(namespaceName, notification.getNotificationId());
}
//since .properties are filtered out by default, so we need to check if there is notification with .properties suffix
String namespaceNameWithPropertiesSuffix =
String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue());
if (m_notifications.containsKey(namespaceNameWithPropertiesSuffix)) {
m_notifications.put(namespaceNameWithPropertiesSuffix, notification.getNotificationId());
}
}
}

// 更新通知消息的明细,即:命名空间下,所有watch key的最新通知id
private void updateRemoteNotifications(List<ApolloConfigNotification> deltaNotifications) {
for (ApolloConfigNotification notification : deltaNotifications) {
if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
continue;
}

if (notification.getMessages() == null || notification.getMessages().isEmpty()) {
continue;
}

ApolloNotificationMessages localRemoteMessages =
m_remoteNotificationMessages.get(notification.getNamespaceName());
if (localRemoteMessages == null) {
localRemoteMessages = new ApolloNotificationMessages();
m_remoteNotificationMessages.put(notification.getNamespaceName(), localRemoteMessages);
}

// 合并明细
localRemoteMessages.mergeFrom(notification.getMessages());
}
}

// 合并命名空间下,watch key以及对应的通知id;如果watch key已存在,保留最大的通知id
public void mergeFrom(ApolloNotificationMessages source) {
if (source == null) {
return;
}

// key: watch key;value:通知id
for (Map.Entry<String, Long> entry : source.getDetails().entrySet()) {
//to make sure the notification id always grows bigger
if (this.has(entry.getKey()) &&
this.get(entry.getKey()) >= entry.getValue()) {
continue;
}
this.put(entry.getKey(), entry.getValue());
}
}

// 通知RemoteConfigRepository
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
// 遍历所有 有发布消息的命名空间
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
// 根据命名空间,获取观察者
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));

ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
// 因为 .properties 在默认情况下被过滤掉,所以我们需要检查是否有监听器。若有,添加到 RemoteConfigRepository 数组
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
// 通知观察者
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}

onLongPollNotified

当 RemoteConfigLongPollService 长轮询到该 RemoteConfigRepository 的 Namespace 下的配置更新时,会回调 #onLongPollNotified(ServiceDTO, ApolloNotificationMessages) 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 监听事件
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
// 设置长轮询到配置更新的 Config Service 。下次同步配置时,优先读取该服务
m_longPollServiceDto.set(longPollNotifiedServiceDto);
// 最新的通知消息
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
// 强制同步
m_configNeedForceRefresh.set(true);
trySync();
}
});
}

服务端拉取配置

image-20201112141922732

"

ConfigController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,
HttpServletRequest request, HttpServletResponse response) throws IOException {
String originalNamespace = namespace;
//strip out .properties suffix
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
namespace = namespaceUtil.normalizeNamespace(appId, namespace);

if (Strings.isNullOrEmpty(clientIp)) {
clientIp = tryToGetClientIp(request);
}

ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);

// 创建 Release 数组
List<Release> releases = Lists.newLinkedList();

String appClusterNameLoaded = clusterName;
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
// 获得 Release 对象
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);

if (currentAppRelease != null) {
releases.add(currentAppRelease);
// 获得 Release 对应的 Cluster 名字
//we have cluster search process, so the cluster name might be overridden
appClusterNameLoaded = currentAppRelease.getClusterName();
}
}

// 若 Namespace 为关联类型,则获取关联的 Namespace 的 Release 对象
//if namespace does not belong to this appId, should check if there is a public configuration
if (!namespaceBelongsToAppId(appId, namespace)) {
// 获得 Release 对象
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);
if (!Objects.isNull(publicRelease)) {
releases.add(publicRelease);
}
}

// 若获得不到 Release ,返回状态码为 404 的响应
if (releases.isEmpty()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
}

auditReleases(appId, clusterName, dataCenter, clientIp, releases);

// 计算 Config Service 的合并 ReleaseKey
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));

if (mergedReleaseKey.equals(clientSideReleaseKey)) {
// 对比 Client 的合并 Release Key 。若相等,说明没有改变,返回状态码为 302 的响应
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
}

ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
// 合并 Release 的配置,并将结果设置到 ApolloConfig 中
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));

Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
}

读取顺序

"

加载配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Override
public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
// 优先,获得指定 Cluster 的 Release 。若存在,直接返回。
// load from specified cluster fist
if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
clientMessages);

if (!Objects.isNull(clusterRelease)) {
return clusterRelease;
}
}

// 其次,获得所属 IDC 的 Cluster 的 Release 。若存在,直接返回
// try to load via data center
if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace,
clientMessages);
if (!Objects.isNull(dataCenterRelease)) {
return dataCenterRelease;
}
}

// 最后,获得默认 Cluster 的 Release 。
// fallback to default release
return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace,
clientMessages);
}

// 获得release对象
private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, ApolloNotificationMessages clientMessages) {
// 读取灰度发布编号
Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId,
configClusterName, configNamespace);

Release release = null;
// 读取灰度 Release 对象
if (grayReleaseId != null) {
release = findActiveOne(grayReleaseId, clientMessages);
}

// 非灰度,获得最新的,并且有效的 Release 对象
if (release == null) {
release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
}

return release;
}

判断是否关联类型的namespace,如果是需要拉取被关联的配置的发布记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private boolean namespaceBelongsToAppId(String appId, String namespaceName) {
// Namespace 非 'application' ,因为每个 App 都有
//Every app has an 'application' namespace
if (Objects.equals(ConfigConsts.NAMESPACE_APPLICATION, namespaceName)) {
return true;
}

//if no appId is present, then no other namespace belongs to it
if (ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
return false;
}

// 非当前 App 下的 Namespace
AppNamespace appNamespace = appNamespaceService.findByAppIdAndNamespace(appId, namespaceName);

return appNamespace != null;
}


// 获取被关联的配置的发布记录
private Release findPublicConfig(String clientAppId, String clientIp, String clusterName,
String namespace, String dataCenter, ApolloNotificationMessages clientMessages) {
// 获得公用类型的 AppNamespace 对象
AppNamespace appNamespace = appNamespaceService.findPublicNamespaceByName(namespace);

// 判断非当前 App 下的,那么就是关联类型。
//check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(clientAppId, appNamespace.getAppId())) {
return null;
}

String publicConfigAppId = appNamespace.getAppId();

// 获得 Namespace 最新的 Release 对象
return configService.loadConfig(clientAppId, clientIp, publicConfigAppId, clusterName, namespace, dataCenter,
clientMessages);
}

参考