apollo--客户端设计

UML

UML

"

源码分析

ConfigService

com.ctrip.framework.apollo.ConfigService ,客户端配置服务,作为配置使用的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ConfigService {
private static final ConfigService s_instance = new ConfigService();

private volatile ConfigManager m_configManager;

// 获取ConfigManager实例,典型的DCL单例模型写法
private ConfigManager getManager() {
if (m_configManager == null) {
synchronized (this) {
if (m_configManager == null) {
m_configManager = ApolloInjector.getInstance(ConfigManager.class);
}
}
}
return m_configManager;
}
}

ConfigManager

com.ctrip.framework.apollo.internals.ConfigManager ,配置管理器**接口**。提供获得 Config 和 ConfigFile 对象的接口,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ConfigManager {
/**
* Get the config instance for the namespace specified.
* @param namespace the namespace
* @return the config instance for the namespace
*/
Config getConfig(String namespace);

/**
* Get the config file instance for the namespace specified.
* @param namespace the namespace
* @param configFileFormat the config file format
* @return the config file instance for the namespace
*/
ConfigFile getConfigFile(String namespace, ConfigFileFormat configFileFormat);
}

DefaultConfigManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DefaultConfigManager implements ConfigManager {
private ConfigFactoryManager m_factoryManager;

private Map<String, Config> m_configs = Maps.newConcurrentMap();
private Map<String, ConfigFile> m_configFiles = Maps.newConcurrentMap();

public DefaultConfigManager() {
m_factoryManager = ApolloInjector.getInstance(ConfigFactoryManager.class);
}

@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);

if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);

if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);

config = factory.create(namespace);
// 添加到缓存
m_configs.put(namespace, config);
}
}
}

return config;
}

...getConfigFile代码省略...
}

ConfigFactoryManager

com.ctrip.framework.apollo.spi.ConfigFactoryManager ,ConfigFactory 管理器**接口**。代码如下:

1
2
3
4
5
6
7
8
9
10
11
public interface ConfigFactoryManager {

/**
* Get the config factory for the namespace.
*
* @param namespace the namespace
* @return the config factory for this namespace
*/
ConfigFactory getFactory(String namespace);

}
  • ConfigFactoryManager 管理的是 ConfigFactory ,而 ConfigManager 管理的是 Config 。

DefaultConfigFactoryManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DefaultConfigFactoryManager implements ConfigFactoryManager {
private ConfigRegistry m_registry;

private Map<String, ConfigFactory> m_factories = Maps.newConcurrentMap();

public DefaultConfigFactoryManager() {
m_registry = ApolloInjector.getInstance(ConfigRegistry.class);
}

@Override
public ConfigFactory getFactory(String namespace) {
// step 1: check hacked factory
ConfigFactory factory = m_registry.getFactory(namespace);

if (factory != null) {
return factory;
}

// step 2: check cache
factory = m_factories.get(namespace);

if (factory != null) {
return factory;
}

// step 3: check declared config factory
factory = ApolloInjector.getInstance(ConfigFactory.class, namespace);

if (factory != null) {
return factory;
}

// step 4: check default config factory
factory = ApolloInjector.getInstance(ConfigFactory.class);

m_factories.put(namespace, factory);

// factory should not be null
return factory;
}
}

ConfigFactory

com.ctrip.framework.apollo.spi.ConfigFactory ,配置工厂接口,其每个接口方法和 ConfigManager 一一对应。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ConfigFactory {
/**
* Create the config instance for the namespace.
*
* @param namespace the namespace
* @return the newly created config instance
*/
Config create(String namespace);

/**
* Create the config file instance for the namespace
* @param namespace the namespace
* @return the newly created config file instance
*/
ConfigFile createConfigFile(String namespace, ConfigFileFormat configFileFormat);

DefaultConfigFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class DefaultConfigFactory implements ConfigFactory {
private static final Logger logger = LoggerFactory.getLogger(DefaultConfigFactory.class);
private ConfigUtil m_configUtil;

public DefaultConfigFactory() {
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
}

// 创建 Config 对象
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
// YAML,YML格式的配置 特殊处理
return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// properties格式的配置
return new DefaultConfig(namespace, createLocalConfigRepository(namespace));
}

LocalFileConfigRepository createLocalConfigRepository(String namespace) {
// 本地模式,使用 LocalFileConfigRepository 对象
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
// 非本地模式,使用 LocalFileConfigRepository + RemoteConfigRepository 对象
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}

// 创建RemoteConfigRepository 对象
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}

..public ConfigFile createConfigFile(String namespace, ConfigFileFormat configFileFormat) ..
}

Config

Uml

Config

"

AbstractConfig

com.ctrip.framework.apollo.internals.AbstractConfig ,实现 Config 接口,Config 抽象类,实现了

1)缓存读取属性值、2)异步通知监听器、3)计算属性变化等等特性。

获取属性值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
public abstract class AbstractConfig implements Config {
private static final Logger logger = LoggerFactory.getLogger(AbstractConfig.class);

private static final ExecutorService m_executorService;

// 观察者
private final List<ConfigChangeListener> m_listeners = Lists.newCopyOnWriteArrayList();
// cache
private final Map<ConfigChangeListener, Set<String>> m_interestedKeys = Maps.newConcurrentMap();
private final Map<ConfigChangeListener, Set<String>> m_interestedKeyPrefixes = Maps.newConcurrentMap();
private final ConfigUtil m_configUtil;
private volatile Cache<String, Integer> m_integerCache;
private volatile Cache<String, Long> m_longCache;
private volatile Cache<String, Short> m_shortCache;
private volatile Cache<String, Float> m_floatCache;
private volatile Cache<String, Double> m_doubleCache;
private volatile Cache<String, Byte> m_byteCache;
private volatile Cache<String, Boolean> m_booleanCache;
private volatile Cache<String, Date> m_dateCache;
private volatile Cache<String, Long> m_durationCache;
private final Map<String, Cache<String, String[]>> m_arrayCache;
private final List<Cache> allCaches;
// 缓存版本号,用于解决更新缓存可能存在的并发问题详细见getValueAndStoreToCache
private AtomicLong m_configVersion; //indicate config version

...........
@Override
public Integer getIntProperty(String key, Integer defaultValue) {
try {
if (m_integerCache == null) {
synchronized (this) {
if (m_integerCache == null) {
// 初始化缓存
m_integerCache = newCache();
}
}
}
// 从缓存中,读取属性值
return getValueFromCache(key, Functions.TO_INT_FUNCTION, m_integerCache, defaultValue);
} catch (Throwable ex) {
Tracer.logError(new ApolloConfigException(
String.format("getIntProperty for %s failed, return default value %d", key,
defaultValue), ex));
}
return defaultValue;
}
}


private <T> T getValueFromCache(String key, Function<String, T> parser, Cache<String, T> cache, T defaultValue) {
T result = cache.getIfPresent(key);

if (result != null) {
return result;
}
// 获得值,并更新到缓存
return getValueAndStoreToCache(key, parser, cache, defaultValue);
}

private <T> T getValueAndStoreToCache(String key, Function<String, T> parser, Cache<String, T> cache, T defaultValue) {
long currentConfigVersion = m_configVersion.get();
String value = getProperty(key, null);

if (value != null) {
T result = parser.apply(value);

if (result != null) {
synchronized (this) {
// 若版本号未变化,则更新到缓存,从而解决并发的问题。
if (m_configVersion.get() == currentConfigVersion) {
cache.put(key, result);
}
}
return result;
}
}

return defaultValue;
}
}

public interface Functions {
Function<String, Integer> TO_INT_FUNCTION = new Function<String, Integer>() {
@Override
public Integer apply(String input) {
return Integer.parseInt(input);
}
};
}

添加配置变更监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void addChangeListener(ConfigChangeListener listener) {
addChangeListener(listener, null);
}

@Override
public void addChangeListener(ConfigChangeListener listener, Set<String> interestedKeys) {
addChangeListener(listener, interestedKeys, null);
}

@Override
public void addChangeListener(ConfigChangeListener listener, Set<String> interestedKeys, Set<String> interestedKeyPrefixes) {
if (!m_listeners.contains(listener)) {
m_listeners.add(listener);
if (interestedKeys != null && !interestedKeys.isEmpty()) {
m_interestedKeys.put(listener, Sets.newHashSet(interestedKeys));
}
if (interestedKeyPrefixes != null && !interestedKeyPrefixes.isEmpty()) {
m_interestedKeyPrefixes.put(listener, Sets.newHashSet(interestedKeyPrefixes));
}
}
}

触发配置变更监听器们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected void fireConfigChange(final ConfigChangeEvent changeEvent) {
for (final ConfigChangeListener listener : m_listeners) {
// check whether the listener is interested in this change event
if (!isConfigChangeListenerInterested(listener, changeEvent)) {
continue;
}
m_executorService.submit(new Runnable() {
@Override
public void run() {
String listenerName = listener.getClass().getName();
Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
try {
// 通知监听器
listener.onChange(changeEvent);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Tracer.logError(ex);
logger.error("Failed to invoke config change listener {}", listenerName, ex);
} finally {
transaction.complete();
}
}
});
}
}

提交到线程池中,异步并发通知监听器们,从而避免有些监听器执行时间过长

DefaultConfig

com.ctrip.framework.apollo.internals.DefaultConfig ,实现 RepositoryChangeListener 接口,继承 AbstractConfig 抽象类,默认 Config 实现类。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private static final Logger logger = LoggerFactory.getLogger(DefaultConfig.class);
private final String m_namespace;
private final Properties m_resourceProperties;
private final AtomicReference<Properties> m_configProperties;
private final ConfigRepository m_configRepository;
private final RateLimiter m_warnLogRateLimiter;

private volatile ConfigSourceType m_sourceType = ConfigSourceType.NONE;

/**
* Constructor.
*
* @param namespace the namespace of this config instance
* @param configRepository the config repository for this config instance
*/
public DefaultConfig(String namespace, ConfigRepository configRepository) {
m_namespace = namespace;
m_resourceProperties = loadFromResource(m_namespace);
m_configRepository = configRepository;
m_configProperties = new AtomicReference<>();
m_warnLogRateLimiter = RateLimiter.create(0.017); // 1 warning log output per minute
initialize();
}

// 从本地资源加载配置
private Properties loadFromResource(String namespace) {
String name = String.format("META-INF/config/%s.properties", namespace);
InputStream in = ClassLoaderUtil.getLoader().getResourceAsStream(name);
Properties properties = null;

if (in != null) {
properties = propertiesFactory.getPropertiesInstance();

try {
properties.load(in);
} catch (IOException ex) {
Tracer.logError(ex);
logger.error("Load resource config for namespace {} failed", namespace, ex);
} finally {
try {
in.close();
} catch (IOException ex) {
// ignore
}
}
}

// 从ConfigRepository加载配置,并注册事件
private void initialize() {
try {
// DefaultConfig 会从 ConfigRepository 中,加载配置 Properties ,并更新到 m_configProperties 中
updateConfig(m_configRepository.getConfig(), m_configRepository.getSourceType());
} catch (Throwable ex) {
Tracer.logError(ex);
logger.warn("Init Apollo Local Config failed - namespace: {}, reason: {}.",
m_namespace, ExceptionUtil.getDetailMessage(ex));
} finally {
// 注册自己到 ConfigRepository 为监听器
m_configRepository.addChangeListener(this);
}
}

private void updateConfig(Properties newConfigProperties, ConfigSourceType sourceType) {
m_configProperties.set(newConfigProperties);
m_sourceType = sourceType;
}

获得属性值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public String getProperty(String key, String defaultValue) {
// step 1: check system properties, i.e. -Dkey=value
String value = System.getProperty(key);

// step 2: check local cached properties file
if (value == null && m_configProperties.get() != null) {
value = m_configProperties.get().getProperty(key);
}

/**
* step 3: check env variable, i.e. PATH=...
* normally system environment variables are in UPPERCASE, however there might be exceptions.
* so the caller should provide the key in the right case
*/
if (value == null) {
value = System.getenv(key);
}

// step 4: check properties file from classpath
if (value == null && m_resourceProperties != null) {
value = m_resourceProperties.getProperty(key);
}

if (value == null && m_configProperties.get() == null && m_warnLogRateLimiter.tryAcquire()) {
logger.warn("Could not load config for namespace {} from Apollo, please check whether the configs are released in Apollo! Return default value now!", m_namespace);
}

return value == null ? defaultValue : value;
}

onRepositoryChange

#onRepositoryChange(namespace, newProperties) 方法,当 ConfigRepository 读取到配置发生变更时,计算配置变更集合,并通知监听器们。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
if (newProperties.equals(m_configProperties.get())) {
return;
}

// 读取新的 Properties 对象
ConfigSourceType sourceType = m_configRepository.getSourceType();
Properties newConfigProperties = propertiesFactory.getPropertiesInstance();
newConfigProperties.putAll(newProperties);

// 计算配置变更集合
Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);

if (actualChanges.isEmpty()) {
return;
}

// 通知监听器们
this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));

Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}

// 更新配置,并且计算出变更的配置
private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties,
ConfigSourceType sourceType) {
//0. 先计算出ConfigRepository变更的配置
List<ConfigChange> configChanges =
calcPropertyChanges(m_namespace, m_configProperties.get(), newConfigProperties);

ImmutableMap.Builder<String, ConfigChange> actualChanges =
new ImmutableMap.Builder<>();

/** === Double check since DefaultConfig has multiple config sources ==== **/

//1. use getProperty to update configChanges's old value
for (ConfigChange change : configChanges) {
change.setOldValue(this.getProperty(change.getPropertyName(), change.getOldValue()));
}

//2. 更新配置
updateConfig(newConfigProperties, sourceType);
clearConfigCache();

//3. use getProperty to update configChange's new value and calc the final changes
for (ConfigChange change : configChanges) {
change.setNewValue(this.getProperty(change.getPropertyName(), change.getNewValue()));
switch (change.getChangeType()) {
case ADDED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getOldValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
case MODIFIED:
if (!Objects.equals(change.getOldValue(), change.getNewValue())) {
actualChanges.put(change.getPropertyName(), change);
}
break;
case DELETED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getNewValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
default:
//do nothing
break;
}
}
return actualChanges.build();
}

/**
* Clear config cache
*/
protected void clearConfigCache() {
// synchronized,用于和 `#getValueAndStoreToCache` 方法,在更新缓存时的互斥,避免并发。
// 每次过期完所有的缓存后,版本号 +1 。
synchronized (this) {
for (Cache c : allCaches) {
if (c != null) {
// 过期缓存
c.invalidateAll();
}
}
// 新增版本号
m_configVersion.incrementAndGet();
}
}

备注

  1. Apollo 源码解析 —— 客户端 API 配置(一)之一览

  2. Apollo 源码解析 —— 客户端 API 配置(二)之 Config