ZooKeeper 深入浅出之四:Zookeeper 应用程序一(配置服务)

原创 zookeeper

一个基本的 ZooKeeper 实现的服务就是“配置服务”,集群中的服务器可以通过 ZooKeeper 共享一个通用的配置数据。从表面上,ZooKeeper 可以理解为一个配置数据的高可用存储服务,为应用提供检索和更新配置数据服务。我们可以使用 ZooKeeper 的观察模式实现一个活动的配置服务,当配置数据发生变化时,可以通知与配置相关客户端。

接下来,我们来实现一个这样的活动配置服务。首先,我们设计用 znode 来存储 key-value 对,我们在 znode 中存储一个 String 类型的数据作为 value,用 znode 的 path 来表示 key。然后,我们实现一个 client,这个 client 可以在任何时候对数据进行跟新操作。那么这个设计的 ZooKeeper 数据模型应该是:master 来更新数据,其他的 worker 也随之将数据更新,就像 HDFS 的 namenode 那样。

我们在一个叫做 ActiveKeyValueStore 的类中编写代码如下:

public class ActiveKeyValueStore extends ConnectionWatcher {

  private static final Charset CHARSET = Charset.forName("UTF-8");

  public void write(String path, String value) throws InterruptedException,
      KeeperException {
    Stat stat = zk.exists(path, false);
    if (stat == null) {
      zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
          CreateMode.PERSISTENT);
    } else {
      zk.setData(path, value.getBytes(CHARSET), -1);
    }
  }
}

write() 方法主要实现将给定的 key-value 对写入到 ZooKeeper 中。这其中隐含了创建一个新的 znode 和更新一个已存在的 znode 的实现方法的不同。那么操作之前,我们需要根据 exists() 来判断 znode 是否存在,然后再根据情况进行相关的操作。其他值得一提的就是 String 类型的数据在转换成 byte[] 时,使用的字符集是 UTF-8。

我们为了说明 ActiveKeyValueStore 怎么使用,我们考虑实现一个 ConfigUpdater 类来实现更新配置。下面代码实现了一个在一些随机时刻更新配置数据的应用。

public class ConfigUpdater {

  public static final String PATH = "/config";

  private ActiveKeyValueStore store;
  private Random random = new Random();

  public ConfigUpdater(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void run() throws InterruptedException, KeeperException {
    while (true) {
      String value = random.nextInt(100) + "";
      store.write(PATH, value);
      System.out.printf("Set %s to %s\n", PATH, value);
      TimeUnit.SECONDS.sleep(random.nextInt(10));
    }
  }

  public static void main(String[] args) throws Exception {
    ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
    configUpdater.run();
  }
}

上面的代码很简单。在 ConfigUpdater 的构造函数中,ActiveKeyValueStore 对象连接到 ZooKeeper 服务。然后 run() 不断的循环运行,使用一个随机数不断的随机更新 /config znode 上的值。

下面我们来看一下,如何读取 /config 上的值。首先,我们在 ActiveKeyValueStore 中实现一个读方法。

public String read(String path, Watcher watcher) throws InterruptedException,
      KeeperException {
    byte[] data = zk.getData(path, watcher, null/*stat*/);
    return new String(data, CHARSET);
}

ZooKeeper 的 getData() 方法的参数包含:path,一个 Watcher 对象和一个 Stat 对象。Stat 对象中含有从 getData() 返回的值,并且负责接收回调信息。这种方式下,调用者不仅可以获得数据,还能够获得 znode 的 metadata。

做为服务的 consumer,ConfigWatcher 以观察者身份,创建一个 ActiveKeyValueStore 对象,并且在启动以后调用 read() 函数(在 dispalayConfig() 函数中)获得相关数据。

下面的代码实现了一个以观察模式获得 ZooKeeper 中的数据更新的应用,并将值到后台中。

public class ConfigWatcher implements Watcher {

  private ActiveKeyValueStore store;

  public ConfigWatcher(String hosts) throws IOException, InterruptedException {
    store = new ActiveKeyValueStore();
    store.connect(hosts);
  }

  public void displayConfig() throws InterruptedException, KeeperException {
    String value = store.read(ConfigUpdater.PATH, this);
    System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value);
  }

  @Override
  public void process(WatchedEvent event) {
    if (event.getType() == EventType.NodeDataChanged) {
      try {
        displayConfig();
      } catch (InterruptedException e) {
        System.err.println("Interrupted. Exiting.");        
        Thread.currentThread().interrupt();
      } catch (KeeperException e) {
        System.err.printf("KeeperException: %s. Exiting.\n", e);        
      }
    }
  }

  public static void main(String[] args) throws Exception {
    ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
    configWatcher.displayConfig();

    // stay alive until process is killed or thread is interrupted
    Thread.sleep(Long.MAX_VALUE);
  }
}

ConfigUpadater 更新 znode 时,ZooKeeper 将触发一个 EventType.NodeDataChanged 的事件给观察者。ConfigWatcher 将在他的 process() 函数中获得这个时间,并将显示读取到的最新的版本的配置数据。

由于观察模式的触发是一次性的,所以每次都要调用 ActiveKeyValueStoreread() 方法,这样才能获得未来的更新数据。我们不能确保一定能够接受到更新通知事件,因为在接受观察事件和下一次读取之间的窗口期内,znode 可能被改变了(有可能很多次),但是 client 可能没有注册观察模式,所以 client 不会接到 znode 改变的通知。在配置服务中这不是一个什么问题,因为 client 只关心配置数据的最新版本。然而,建议读者关注一下这个潜在的问题。

让我们来看一下控制台打印的 ConfigUpdater 运行结果:

% java ConfigUpdater localhost
Set /config to 79
Set /config to 14
Set /config to 78

然后立即在另外的控制台终端窗口中运行 ConfigWatcher:

% java ConfigWatcher localhost
Read /config as 79
Read /config as 14
Read /config as 78
如果觉得这对你有用,请随意赞赏,给与作者支持
评论 0
最新评论