ZooKeeper 深入浅出之二:Zookeeper 开发实例

原创 zookeeper

ZooKeeper 深入浅出系列根据 《Hadoop 权威指南 第四版》 英文原版翻译整理。

这一节我们将讲解如何编写 Zookeeper 客户端的程序,来控制 zookeeper 上的数据,以达到管理客户端所在集群的成员关系。

ZooKeeper 中的组和成员

我们可以把 Zookeeper 理解为一个高可用的文件系统。但是它没有文件和文件夹的概念,只有一个叫做 znode 的节点概念。 那么 znode 即是数据的容器,也是其他节点的容器。(其实 znode 就可以理解为文件或者是文件夹)我们用父节点和子节点的关系来表示组和成员的关系。 那么一个节点代表一个组,组节点下的子节点代表组内的成员。如下图所示:

创建组

我们使用 zookeeper 的 Java API 来创建一个 /zoo 的组节点:

public class CreateGroup implements Watcher {
 private static final int SESSION_TIMEOUT = 5000;
 private ZooKeeper zk;
 private CountDownLatch connectedSignal = new CountDownLatch(1);
 public void connect(String hosts) throws IOException, InterruptedException {
     zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
     connectedSignal.await();
 }

 @Override
 public void process(WatchedEvent event) { // Watcher interface
     if (event.getState() == KeeperState.SyncConnected) {
         connectedSignal.countDown();
     }
 }

 public void create(String groupName) throws KeeperException,
 InterruptedException {
     String path = "/" + groupName;
     String createPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     System.out.println("Created " + createPath);
 }
}

main() 执行时,首先创建了一个 CreateGroup 的对象,然后调用 connect() 方法,通过 zookeeper 的 API 与 zookeeper服务器连接。 创建连接我们需要3个参数:一是服务器端主机名称以及端口号,二是客户端连接服务器 session 的超时时间,三是 Watcher 接口的一个实例。 Watcher 实例负责接收 Zookeeper 数据变化时产生的事件回调。

在连接函数中创建了 zookeeper 的实例,然后建立与服务器的连接。 建立连接函数会立即返回,所以我们需要等待连接建立成功后再进行其他的操作。 我们使用 CountDownLatch 来阻塞当前线程,直到 zookeeper 准备就绪。 这时,我们就看到 Watcher 的作用了。我们实现了 Watcher 接口的一个方法:

public void process(WatchedEvent event);

当客户端连接上了 zookeeper 服务器,Watcher 将由 process() 函数接收一个连接成功的事件。 我们接下来调用 CountDownLatch,释放之前的阻塞。

连接成功后,我们调用 create() 方法。我们在这个方法中调用 zookeeper 实例的 create() 方法来创建一个 znode。 参数包括:一是 znode 的 path;二是 znode 的内容(一个二进制数组),三是一个 access control list(ACL,访问控制列表,这里使用完全开放模式),最后是 znode 的性质。

znode 的性质分为 ephemeralpersistent 两种。ephemeral 性质的 znode 在创建他的客户端的会话结束,或者客户端以其他原因断开与服务器的连接时,会被自动删除。而 persistent 性质的 znode 就不会被自动删除,除非客户端主动删除,而且不一定是创建它的客户端可以删除它,其他客户端也可以删除它。这里我们创建一个 persistent 的 znode。

create() 将返回 znode 的 path。我们将新建 znode 的 path 打印出来。

我们执行如上程序:

% export CLASSPATH=ch21-zk/target/classes/:$ZOOKEEPER_HOME/*:\
$ZOOKEEPER_HOME/lib/*:$ZOOKEEPER_HOME/conf
% java CreateGroup localhost zoo
Created /zoo

加入组

接下来我们实现如何在一个组中注册成员。我们将使用 ephemeral znode 来创建这些成员节点。那么当客户端程序退出时,这些成员将被删除。

我们创建一个 ConnetionWatcher 类,然后继承实现一个 JoinGroup 类:

public class ConnectionWatcher implements Watcher {

  private static final int SESSION_TIMEOUT = 5000;

  protected ZooKeeper zk;
  private CountDownLatch connectedSignal = new CountDownLatch(1);

  public void connect(String hosts) throws IOException, InterruptedException {
    zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
    connectedSignal.await();
  }

  @Override
  public void process(WatchedEvent event) {
    if (event.getState() == KeeperState.SyncConnected) {
      connectedSignal.countDown();
    }
  }

  public void close() throws InterruptedException {
    zk.close();
  }
}

JoinGroup 测试类

public class JoinGroup extends ConnectionWatcher {

  public void join(String groupName, String memberName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName + "/" + memberName;
    String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,
      CreateMode.EPHEMERAL);
    System.out.println("Created " + createdPath);
  }

  public static void main(String[] args) throws Exception {
    JoinGroup joinGroup = new JoinGroup();
    joinGroup.connect(args[0]);
    joinGroup.join(args[1], args[2]);

    // stay alive until process is killed or thread is interrupted
    Thread.sleep(Long.MAX_VALUE);
  }
}

加入组与创建组非常相似。我们加入了一个 ephemeral znode 后,让线程阻塞住。然后我们可以使用命令行查看 zookeeper 中我们创建的 znode。当我们将阻塞的程序强行关闭后,我们会发现我们创建的 znode 会自动消失。

成员列表

下面我们实现一个程序来列出一个组中的所有成员。

public class ListGroup extends ConnectionWatcher {
  public void list(String groupName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName;

    try {
      List<String> children = zk.getChildren(path, false);
      if (children.isEmpty()) {
        System.out.printf("No members in group %s\n", groupName);
        System.exit(1);
      }
      for (String child : children) {
        System.out.println(child);
      }
    } catch (KeeperException.NoNodeException e) {
      System.out.printf("Group %s does not exist\n", groupName);
      System.exit(1);
    }
  }

  public static void main(String[] args) throws Exception {
    ListGroup listGroup = new ListGroup();
    listGroup.connect(args[0]);
    listGroup.list(args[1]);
    listGroup.close();
  }
}

我们在 list() 方法中通过调用 getChildren() 方法来获得某一个 path 下的子节点,然后打印出来。我们这里会试着捕获 KeeperException.NoNodeException,当 znode 不存在时会抛出这个异常。我们运行程序,会看见如下结果,说明我们还没在 zoo 组中添加任何成员几点:

% java ListGroup localhost zoo
No members in group zoo

我们可以运行之前的 JoinGroup 来添加成员。在后台运行一些 JoinGroup 程序,这些程序添加节点后都处于 sleep 状态:

% java JoinGroup localhost zoo duck &
% java JoinGroup localhost zoo cow &
% java JoinGroup localhost zoo goat &
% goat_pid=$!

最后一行命令的作用是将最后一个启动的 java 程序的 pid 记录下来,我们好在列出 zoo 下面的成员后,将该进程 kill 掉。

下面我们将zoo下的成员打印出来:

% java ListGroup localhost zoo
goat
duck
cow

然后我们将 kill 掉最后启动的 JoinGroup 客户端:

% kill $goat_pid

过几秒后,我们发现 goat 节点不见了。因为之前我们创建的 goat 节点是一个 ephemeral 节点,而创建这个节点的客户端在 ZooKeeper 上的会话已经被终结了,因为这个回话在5秒后失效了(我们设置了会话的超时时间为5秒):

% java ListGroup localhost zoo
duck
cow

让我们回过头来看看,我们到底都做了一些什么?我们首先创建了一个节点组,这些节点的创建者都在同一个分布式系统中。这些节点的创建者之间互相都不知情。一个创建者想使用这些节点数据进行一些工作,例如通过 znode 节点是否存在来判断节点的创建者是否存在。

最后一点,我们不能只依靠组成员关系来完全解决在与节点通信时的网络错误。当与一个集群组成员节点进行通信时,发生了通信失败,我们需要使用重试或者试验与组中其他的节点通信,来解决这次通信失败。

Zookeeper 的命令行工具

Zookeeper 有一套命令行工具。我们可以像如下使用,来查找 zoo 下的成员节点:

% zkCli.sh -server localhost ls /zoo
[cow, duck]

你可以不加参数运行这个工具,来获得帮助。

删除分组

下面让我们来看一下如何删除一个分组?

ZooKeeper 的 API 提供一个 delete() 方法来删除一个 znode。我们通过输入 znode 的 path 和版本号(version number)来删除想要删除的 znode。我们除了使用 path 来定位我们要删除的 znode,还需要一个参数是版本号。只有当我们指定要删除的本版号,与 znode 当前的版本号一致时,ZooKeeper 才允许我们将 znode 删除掉。这是一种 optimistic locking 机制,用来处理 znode 的读写冲突。我们也可以忽略版本号一致检查,做法就是版本号赋值为 -1

删除一个 znode 之前,我们需要先删除它的子节点,就下如下代码中实现的那样:

public class DeleteGroup extends ConnectionWatcher {
  public void delete(String groupName) throws KeeperException,
      InterruptedException {
    String path = "/" + groupName;

    try {
      List<String> children = zk.getChildren(path, false);
      for (String child : children) {
        zk.delete(path + "/" + child, -1);
      }
      zk.delete(path, -1);
    } catch (KeeperException.NoNodeException e) {
      System.out.printf("Group %s does not exist\n", groupName);
      System.exit(1);
    }
  }

  public static void main(String[] args) throws Exception {
    DeleteGroup deleteGroup = new DeleteGroup();
    deleteGroup.connect(args[0]);
    deleteGroup.delete(args[1]);
    deleteGroup.close();
  }
}

最后我们执行如下操作来删除 zoo group:

% java DeleteGroup localhost zoo
% java ListGroup localhost zoo
Group zoo does not exist
如果觉得这对你有用,请随意赞赏,给与作者支持
评论 0
最新评论