apollo--发布配置

1. 流程图

image-20200924224935805

"

2. 发布配置api

admin service发布配置api。先发布配置,再发送发布消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Transactional
@RequestMapping(path = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases", method = RequestMethod.POST)
public ReleaseDTO publish(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName,
@RequestParam("name") String releaseName,
@RequestParam(name = "comment", required = false) String releaseComment,
@RequestParam("operator") String operator,
@RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {
// 校验对应的 Namespace 对象是否存在。若不存在,抛出 NotFoundException 异常
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
if (namespace == null) {
throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId, clusterName, namespaceName));
}
// 发布 Namespace 的配置
Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);

// send release message
// 获得 Cluster 名
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
String messageCluster;
if (parentNamespace != null) { // 灰度发布
messageCluster = parentNamespace.getClusterName();
} else {
messageCluster = clusterName; // 使用请求的 ClusterName
}
// 发送 Release 消息
messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName), Topics.APOLLO_RELEASE_TOPIC);

// 将 Release 转换成 ReleaseDTO 对象
return BeanUtils.transfrom(ReleaseDTO.class, release);
}

3. 发布配置

根据是否存在父Namespace,判断是否走灰度发布还是主干发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//ReleaseService
@Transactional
public Release publish(Namespace namespace, String releaseName, String releaseComment, String operator, boolean isEmergencyPublish) {
// 校验NamespaceLock
checkLock(namespace, isEmergencyPublish, operator);
// 获得 Namespace 的配置
Map<String, String> operateNamespaceItems = getNamespaceItems(namespace);
// 获得父 Namespace
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
// 若有父 Namespace ,说明当前是灰度发布,即branch release
if (parentNamespace != null) {
return publishBranchNamespace(parentNamespace, namespace, operateNamespaceItems, releaseName, releaseComment, operator, isEmergencyPublish);
}
// 运行到这,说明当前不是灰度发布。此时尝试获得子 Namespace 对象
Namespace childNamespace = namespaceService.findChildNamespace(namespace);
// 如果有子Namespace,说明存在灰度配置。此时缓存一份主干分支上次的 Release 对象
Release previousRelease = null;
if (childNamespace != null) {
previousRelease = findLatestActiveRelease(namespace);
}
// 创建操作 Context,记录到发布历史记录表中
Map<String, Object> operationContext = Maps.newHashMap();
operationContext.put(ReleaseOperationContext.IS_EMERGENCY_PUBLISH, isEmergencyPublish);
// 主干发布,即master release
Release release = masterRelease(namespace, releaseName, releaseComment, operateNamespaceItems, operator, ReleaseOperation.NORMAL_RELEASE, operationContext); // 是否紧急发布。
// 若有子 Namespace 时,自动将主干合并到子 Namespace ,并进行一次子 Namespace 的发布(merge to branch and auto release)
if (childNamespace != null) {
mergeFromMasterAndPublishBranch(namespace, childNamespace, operateNamespaceItems,
releaseName, releaseComment, operator, previousRelease,
release, isEmergencyPublish);
}
return release;
}

3.1 灰度发布

  • 灰度命名空间就是一个集群的子集群对应的命名空间,灰度命名空间的appId,appNamespace与父命名空间的appId,appNamespace一致。

  • 灰度key就是灰度命名空间对应配置的key,即子Namespace的Item表中的key

    灰度发布其实就是:发布“灰度key的配置和主干分支的配置合并后的配置”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
private Release publishBranchNamespace(Namespace parentNamespace, Namespace childNamespace,
Map<String, String> childNamespaceItems,
String releaseName, String releaseComment,
String operator, boolean isEmergencyPublish, Set<String> grayDelKeys) {
// 获得父 Namespace 的最近一次有效 Release 对象
Release parentLatestRelease = findLatestActiveRelease(parentNamespace);
// 获得父 Namespace 的配置项
Map<String, String> parentConfigurations = parentLatestRelease != null ?
GSON.fromJson(parentLatestRelease.getConfigurations(),
GsonType.CONFIG) : new LinkedHashMap<>();
// 获得父 Namespace 的 releaseId 属性
long baseReleaseId = parentLatestRelease == null ? 0 : parentLatestRelease.getId();

// 合并配置。合并父配置与本次灰度发布的配置,如果有冲突,已灰度发布的配置为准
Map<String, String> configsToPublish = mergeConfiguration(parentConfigurations, childNamespaceItems);
if(!(grayDelKeys == null || grayDelKeys.size()==0)){
for (String key : grayDelKeys){
configsToPublish.remove(key);
}
}

// 发布子 Namespace 的配置,发布的配置是:合并后的配置
return branchRelease(parentNamespace, childNamespace, releaseName, releaseComment,
configsToPublish, baseReleaseId, operator, ReleaseOperation.GRAY_RELEASE, isEmergencyPublish,
childNamespaceItems.keySet());

}

/**
* 2个map合并。(合并父配置配置与灰度的配置)
*/
private Map<String, String> mergeConfiguration(Map<String, String> baseConfigurations, Map<String, String> coverConfigurations) {
Map<String, String> result = new HashMap<>();
// 父 Namespace 的配置项
for (Map.Entry<String, String> entry : baseConfigurations.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}
// 子 Namespace 的配置项
for (Map.Entry<String, String> entry : coverConfigurations.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}
// 返回合并后的配置项
return result;
}


/**
* 发布灰度版本。1.插入一条release记录;2:更新GrayReleaseRule的发布id;3:插入一条release发布的历史记录
*/
private Release branchRelease(Namespace parentNamespace, Namespace childNamespace,
String releaseName, String releaseComment,
Map<String, String> configurations, long baseReleaseId,
String operator, int releaseOperation, boolean isEmergencyPublish, Collection<String> branchReleaseKeys) {

// 获得父 Namespace 最后有效的 Release 对象
Release previousRelease = findLatestActiveRelease(childNamespace.getAppId(),
childNamespace.getClusterName(),
childNamespace.getNamespaceName());
// 获得父 Namespace 最后有效的 Release 对象的编号
long previousReleaseId = previousRelease == null ? 0 : previousRelease.getId();

// 创建 Map ,用于 ReleaseHistory 对象的 `operationContext` 属性。
Map<String, Object> releaseOperationContext = Maps.newLinkedHashMap();
releaseOperationContext.put(ReleaseOperationContext.BASE_RELEASE_ID, baseReleaseId);
releaseOperationContext.put(ReleaseOperationContext.IS_EMERGENCY_PUBLISH, isEmergencyPublish);
// 此处记录灰度分支中的灰度key
releaseOperationContext.put(ReleaseOperationContext.BRANCH_RELEASE_KEYS, branchReleaseKeys);

// 创建子 Namespace 的 Release 对象,并保存
Release release =
createRelease(childNamespace, releaseName, releaseComment, configurations, operator);

// 更新 GrayReleaseRule 的 releaseId 属性
GrayReleaseRule grayReleaseRule = namespaceBranchService.updateRulesReleaseId(childNamespace.getAppId(),parentNamespace.getClusterName(),childNamespace.getNamespaceName(),childNamespace.getClusterName(),release.getId(), operator);

// 创建 ReleaseHistory 对象,并保存
if (grayReleaseRule != null) {
releaseOperationContext.put(ReleaseOperationContext.RULES, GrayReleaseRuleItemTransformer.batchTransformFromJSON(grayReleaseRule.getRules()));
}
releaseHistoryService.createReleaseHistory(parentNamespace.getAppId(), parentNamespace.getClusterName(),parentNamespace.getNamespaceName(), childNamespace.getClusterName(),release.getId(),previousReleaseId, releaseOperation, releaseOperationContext, operator);

return release;
}

3.2 主干发布

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
private Release masterRelease(Namespace namespace, String releaseName, String releaseComment,
Map<String, String> configurations, String operator,
int releaseOperation, Map<String, Object> operationContext) {
// 获得最后有效的 Release 对象
Release lastActiveRelease = findLatestActiveRelease(namespace);
long previousReleaseId = lastActiveRelease == null ? 0 : lastActiveRelease.getId();
// 创建 Release 对象,并保存
Release release = createRelease(namespace, releaseName, releaseComment, configurations, operator);

// 创建 ReleaseHistory 对象,并保存
releaseHistoryService.createReleaseHistory(namespace.getAppId(), namespace.getClusterName(),
namespace.getNamespaceName(), namespace.getClusterName(),
release.getId(), previousReleaseId, releaseOperation,
operationContext, operator);
return release;
}

/**
* 创建一条发布对象
*/
private Release createRelease(Namespace namespace, String name, String comment,
Map<String, String> configurations, String operator) {
// 创建 Release 对象
Release release = new Release();
release.setReleaseKey(ReleaseKeyGenerator.generateReleaseKey(namespace));
release.setDataChangeCreatedTime(new Date());
release.setDataChangeCreatedBy(operator);
release.setDataChangeLastModifiedBy(operator);
release.setName(name);
release.setComment(comment);
release.setAppId(namespace.getAppId());
release.setClusterName(namespace.getClusterName());
release.setNamespaceName(namespace.getNamespaceName());
release.setConfigurations(gson.toJson(configurations)); // 使用 Gson ,将配置 Map 格式化成字符串。
// 保存 Release 对象
release = releaseRepository.save(release);
// 释放 NamespaceLock
namespaceLockService.unlock(namespace.getId());
// 记录 Audit 到数据库中
auditService.audit(Release.class.getSimpleName(), release.getId(), Audit.OP.INSERT, release.getDataChangeCreatedBy());
return release;
}

/**
* 记录发布历史
*/
@Transactional
public ReleaseHistory createReleaseHistory(String appId, String clusterName, String
namespaceName, String branchName, long releaseId, long previousReleaseId, int operation,
Map<String, Object> operationContext, String operator) {
// 创建 ReleaseHistory 对象
ReleaseHistory releaseHistory = new ReleaseHistory();
releaseHistory.setAppId(appId);
releaseHistory.setClusterName(clusterName);
releaseHistory.setNamespaceName(namespaceName);
releaseHistory.setBranchName(branchName);
releaseHistory.setReleaseId(releaseId);
releaseHistory.setPreviousReleaseId(previousReleaseId);
releaseHistory.setOperation(operation);
if (operationContext == null) {
releaseHistory.setOperationContext("{}"); //default empty object
} else {
releaseHistory.setOperationContext(GSON.toJson(operationContext));
}
releaseHistory.setDataChangeCreatedTime(new Date());
releaseHistory.setDataChangeCreatedBy(operator);
releaseHistory.setDataChangeLastModifiedBy(operator);

// 保存 ReleaseHistory 对象
releaseHistoryRepository.save(releaseHistory);

auditService.audit(ReleaseHistory.class.getSimpleName(), releaseHistory.getId(),
Audit.OP.INSERT, releaseHistory.getDataChangeCreatedBy());

return releaseHistory;
}

合并分支

主干分支发布的时候,会把主干分支的配置合并到灰度分支的配置。如果合并后的配置与灰度分支的配置不同,那么把合并后的配置发布到灰度分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
private void mergeFromMasterAndPublishBranch(Namespace parentNamespace, Namespace childNamespace,
Map<String, String> parentNamespaceItems,
String releaseName, String releaseComment,
String operator, Release masterPreviousRelease,
Release parentRelease, boolean isEmergencyPublish) {
// 获得子 Namespace 的配置
Release childNamespaceLatestActiveRelease = findLatestActiveRelease(childNamespace);

Map<String, String> childReleaseConfiguration;
Collection<String> branchReleaseKeys;
if (childNamespaceLatestActiveRelease != null) {
childReleaseConfiguration = GSON.fromJson(childNamespaceLatestActiveRelease.getConfigurations(), GsonType.CONFIG);
branchReleaseKeys = getBranchReleaseKeys(childNamespaceLatestActiveRelease.getId());
} else {
childReleaseConfiguration = Collections.emptyMap();
branchReleaseKeys = null;
}

// 获得父 Namespace 的配置
Map<String, String> parentNamespaceOldConfiguration = masterPreviousRelease == null ?
null : GSON.fromJson(masterPreviousRelease.getConfigurations(),
GsonType.CONFIG);
// 合并主干分支的配置和灰度分支的配置
Map<String, String> childNamespaceToPublishConfigs =
calculateChildNamespaceToPublishConfiguration(parentNamespaceOldConfiguration, parentNamespaceItems,
childReleaseConfiguration, branchReleaseKeys);

// 若配置发生了变化,则进行一次子 Namespace 的发布
if (!childNamespaceToPublishConfigs.equals(childReleaseConfiguration)) {
branchRelease(parentNamespace, childNamespace, releaseName, releaseComment,
childNamespaceToPublishConfigs, parentRelease.getId(), operator,
ReleaseOperation.MASTER_NORMAL_RELEASE_MERGE_TO_GRAY, isEmergencyPublish, branchReleaseKeys);
}

}

/**
* 合并分支
*/
private Map<String, String> calculateChildNamespaceToPublishConfiguration(
Map<String, String> parentNamespaceOldConfiguration, Map<String, String> parentNamespaceNewConfiguration,
Map<String, String> childNamespaceLatestActiveConfiguration, Collection<String> branchReleaseKeys) {

//first. calculate child namespace modified configs
// 1. 找到灰度分支中灰度key对应的配置。灰度key在灰度发布的时候有记录(可以查看灰度发布章节)
Map<String, String> childNamespaceModifiedConfiguration = calculateBranchModifiedItemsAccordingToRelease(
parentNamespaceOldConfiguration, childNamespaceLatestActiveConfiguration, branchReleaseKeys);

//second. append child namespace modified configs to parent namespace new latest configuration
// 2. 主分支配置合并灰度配置。冲突的配置,以灰度配置为准
return mergeConfiguration(parentNamespaceNewConfiguration, childNamespaceModifiedConfiguration);
}

/**
* 计算出灰度分支中的灰度配置(此处是,被灰度的key对应的配置)
*/
private Map<String, String> calculateBranchModifiedItemsAccordingToRelease(
Map<String, String> masterReleaseConfigs, Map<String, String> branchReleaseConfigs,
Collection<String> branchReleaseKeys) {

Map<String, String> modifiedConfigs = new LinkedHashMap<>();

if (CollectionUtils.isEmpty(branchReleaseConfigs)) {
return modifiedConfigs;
}

// new logic, retrieve modified configurations based on branch release keys
// 新逻辑,获取灰度key对应的配置
if (branchReleaseKeys != null) {
for (String branchReleaseKey : branchReleaseKeys) {
if (branchReleaseConfigs.containsKey(branchReleaseKey)) {
modifiedConfigs.put(branchReleaseKey, branchReleaseConfigs.get(branchReleaseKey));
}
}

return modifiedConfigs;
}

// old logic, retrieve modified configurations by comparing branchReleaseConfigs with masterReleaseConfigs
// 此处是为了兼容老逻辑。可以不看
if (CollectionUtils.isEmpty(masterReleaseConfigs)) {
return branchReleaseConfigs;
}

for (Map.Entry<String, String> entry : branchReleaseConfigs.entrySet()) {

if (!Objects.equals(entry.getValue(), masterReleaseConfigs.get(entry.getKey()))) {
modifiedConfigs.put(entry.getKey(), entry.getValue());
}
}

return modifiedConfigs;

}

/**
* map合并
*/
private Map<String, String> mergeConfiguration(Map<String, String> baseConfigurations,
Map<String, String> coverConfigurations) {
Map<String, String> result = new LinkedHashMap<>();
//copy base configuration
for (Map.Entry<String, String> entry : baseConfigurations.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}

//update and publish
for (Map.Entry<String, String> entry : coverConfigurations.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}

return result;
}

4. 发送发布消息

发布配置后,会生产一条ReleaseMessage记录到表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

// ReleaseController
// send release message
// 获得 Cluster 名
Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
String messageCluster;
if (parentNamespace != null) {
// 有父 Namespace ,说明是灰度发布,使用父 Namespace 的集群名
messageCluster = parentNamespace.getClusterName();
} else {
// 使用请求的 ClusterName
messageCluster = clusterName;
}
// 发送 Release 消息
messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName), Topics.APOLLO_RELEASE_TOPIC);


/**
* DatabaseMessageSender
*/
private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel);
// 仅允许发送 APOLLO_RELEASE_TOPIC
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
logger.warn("Channel {} not supported by DatabaseMessageSender!");
return;
}

Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
try {
// 保存 ReleaseMessage 对象
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
// 添加到清理 Message 队列。若队列已满,添加失败,不阻塞等待。
toClean.offer(newMessage.getId());

transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
logger.error("Sending message to database failed", ex);
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}

/**
* 初始化一个定时器,定时清理旧的发布消息。
*/
@PostConstruct
private void initialize() {
cleanExecutorService.submit(() -> {
while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
// 从队列中拉取最新的发布消息
Long rm = toClean.poll(1, TimeUnit.SECONDS);
if (rm != null) {
// 清理旧的发布消息
cleanMessage(rm);
} else {
TimeUnit.SECONDS.sleep(5);
}
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
});
}

/**
* 清理发布消息
*/
private void cleanMessage(Long id) {
//double check in case the release message is rolled back
ReleaseMessage releaseMessage = releaseMessageRepository.findById(id).orElse(null);
if (releaseMessage == null) {
return;
}
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
// 删除比当前releaseId小的记录
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
releaseMessage.getMessage(), releaseMessage.getId());

releaseMessageRepository.deleteAll(messages);
hasMore = messages.size() == 100;

messages.forEach(toRemove -> Tracer.logEvent(
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
}
}

参考