ZooKeeper 深入浅出之四:Zookeeper 应用程序一(配置服务)
一个基本的 ZooKeeper 实现的服务就是“配置服务”,集群中的服务器可以通过 ZooKeeper 共享一个通用的配置数据。从表面上,ZooKeeper 可以理解为一个配置数据的高可用存储服务,为应用提供检索和更新配置数据服务。我们可以使用 ZooKeeper 的观察模式实现一个活动的配置服务,当配置数据发生变化时,可以通知与配置相关客户端。
接下来,我们来实现一个这样的活动配置服务。首先,我们设计用 znode 来存储 key-value 对,我们在 znode 中存储一个 String 类型的数据作为 value,用 znode 的 path 来表示 key。然后,我们实现一个 client,这个 client 可以在任何时候对数据进行跟新操作。那么这个设计的 ZooKeeper 数据模型应该是:master 来更新数据,其他的 worker 也随之将数据更新,就像 HDFS 的 namenode 那样。
我们在一个叫做 ActiveKeyValueStore
的类中编写代码如下:
public class ActiveKeyValueStore extends ConnectionWatcher {
private static final Charset CHARSET = Charset.forName("UTF-8");
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
}
write()
方法主要实现将给定的 key-value 对写入到 ZooKeeper 中。这其中隐含了创建一个新的 znode 和更新一个已存在的 znode 的实现方法的不同。那么操作之前,我们需要根据 exists()
来判断 znode 是否存在,然后再根据情况进行相关的操作。其他值得一提的就是 String 类型的数据在转换成 byte[]
时,使用的字符集是 UTF-8。
我们为了说明 ActiveKeyValueStore
怎么使用,我们考虑实现一个 ConfigUpdater
类来实现更新配置。下面代码实现了一个在一些随机时刻更新配置数据的应用。
public class ConfigUpdater {
public static final String PATH = "/config";
private ActiveKeyValueStore store;
private Random random = new Random();
public ConfigUpdater(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void run() throws InterruptedException, KeeperException {
while (true) {
String value = random.nextInt(100) + "";
store.write(PATH, value);
System.out.printf("Set %s to %s\n", PATH, value);
TimeUnit.SECONDS.sleep(random.nextInt(10));
}
}
public static void main(String[] args) throws Exception {
ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
configUpdater.run();
}
}
上面的代码很简单。在 ConfigUpdater
的构造函数中,ActiveKeyValueStore
对象连接到 ZooKeeper 服务。然后 run()
不断的循环运行,使用一个随机数不断的随机更新 /config
znode 上的值。
下面我们来看一下,如何读取 /config
上的值。首先,我们在 ActiveKeyValueStore
中实现一个读方法。
public String read(String path, Watcher watcher) throws InterruptedException,
KeeperException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
ZooKeeper 的 getData()
方法的参数包含:path,一个 Watcher 对象和一个 Stat 对象。Stat 对象中含有从 getData()
返回的值,并且负责接收回调信息。这种方式下,调用者不仅可以获得数据,还能够获得 znode 的 metadata。
做为服务的 consumer,ConfigWatcher
以观察者身份,创建一个 ActiveKeyValueStore
对象,并且在启动以后调用 read()
函数(在 dispalayConfig()
函数中)获得相关数据。
下面的代码实现了一个以观察模式获得 ZooKeeper 中的数据更新的应用,并将值到后台中。
public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore store;
public ConfigWatcher(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void displayConfig() throws InterruptedException, KeeperException {
String value = store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.err.println("Interrupted. Exiting.");
Thread.currentThread().interrupt();
} catch (KeeperException e) {
System.err.printf("KeeperException: %s. Exiting.\n", e);
}
}
}
public static void main(String[] args) throws Exception {
ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
configWatcher.displayConfig();
// stay alive until process is killed or thread is interrupted
Thread.sleep(Long.MAX_VALUE);
}
}
当 ConfigUpadater
更新 znode 时,ZooKeeper 将触发一个 EventType.NodeDataChanged
的事件给观察者。ConfigWatcher
将在他的 process()
函数中获得这个时间,并将显示读取到的最新的版本的配置数据。
由于观察模式的触发是一次性的,所以每次都要调用 ActiveKeyValueStore
的 read()
方法,这样才能获得未来的更新数据。我们不能确保一定能够接受到更新通知事件,因为在接受观察事件和下一次读取之间的窗口期内,znode 可能被改变了(有可能很多次),但是 client 可能没有注册观察模式,所以 client 不会接到 znode 改变的通知。在配置服务中这不是一个什么问题,因为 client 只关心配置数据的最新版本。然而,建议读者关注一下这个潜在的问题。
让我们来看一下控制台打印的 ConfigUpdater
运行结果:
% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78
然后立即在另外的控制台终端窗口中运行 ConfigWatcher
:
% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78