一. 基本操作
- org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话,提供了下表中的9种操作
操作 | 描述 |
---|---|
create | 创建一个znode |
delete | 删除一个znode(该znode不能有任何子节点) |
exists | 测试一个znode是否存在并查询它的元数据 |
getACL, setACL | 获取/设置一个znode的ACL |
getChildren | 获取一个znode的子节点列表 |
getData, setData | 获取/设置一个znode所保存的数据 |
sync | 将客户端的znode视图与Zookeeper同步 |
二. 常用API
- 建立连接,创建节点
public class SimpleZkClient { private static final String connectString = "hadoop2:2181,hadoop3:2181,hadoop4:2181"; private static final int sessionTimeout = 2000; ZooKeeper zkClient = null; @Before public void init() throws Exception { //构造方法Zookeeper(String connectString, int sessionTimeout, Watcher watcher) //Watcher接口中需要实现 收到事件通知后的回调函数 // ----- abstract public void process(WatchedEvent event); //回调函数收到一次事件通知后就失效,如果想处理多次事件,需要在回调函数中再次注册Watcher zkClient = new ZooKeeper(connectString, sessionTimeout, event -> System.out.println(event.getType() + "------" + event.getPath())); } @Test public void testCreate() throws KeeperException, InterruptedException { //创建一个znode //参数1: 要创建的节点 //参数2: 节点数据 //参数3: 节点的权限 //参数4: 节点的类型 String nodeCreated = zkClient.create("/intellij", "It_this_node_data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } }
- 获取子节点
@Test public void getChildren() throws KeeperException, InterruptedException { // List<String> getChildren(String path, boolean watch) List<String> children = zkClient.getChildren("/", true); children.forEach(System.out::println); }
- 判断是否存在
@Test public void testExist() throws KeeperException, InterruptedException { // Stat exists(String path, boolean watch) Stat stat = zkClient.exists("/intellij", false); System.out.println(stat == null ? "not exist" : "exist"); }
三. 服务器上下线动态感知
- 考虑如下场景:
集群中服务器会有动态上下线的情况,客户端怎么知道当前可用的服务器。可以使用Zookeeper完成业务需求
- 服务器启动时,即去Zookeeper注册信息(短暂序列化znode)
- 客户端启动后通过Zookeeper获取当前在线服务器列表,并且注册监听(通过getChildren方法)
- 简单的实现如上功能
- 服务器实现
public class DistributeServer { private static final String connectString = "hadoop2:2181,hadoop3:2181,hadoop4:2181"; private static final int sessionTimeout = 2000; private static final String parentNode = "/servers"; private ZooKeeper zk = null; public static void main(String[] args) throws Exception { // 获取zk连接 DistributeServer server = new DistributeServer(); server.getConnect(); // 利用zk连接注册服务器信息 server.registerServer(args[0]); // 注册完Zookeeper信息后,服务器处理自己原本的业务便可 System.out.println(args[0] + " is doing its business..."); Thread.sleep(Long.MAX_VALUE); } public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, event -> { System.out.println(event.getType() + "------" + event.getPath()); try { // 再次注册事件 zk.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } }); } public void registerServer(String hostname) throws KeeperException, InterruptedException { String createdNode = zk.create(parentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online.." + createdNode); } }
- 客户端实现
public class DistributeClient { private static final String connectString = "hadoop2:2181,hadoop3:2181,hadoop4:2181"; private static final int sessionTimeout = 2000; private static final String parentNode = "/servers"; private List<String> serverList; private ZooKeeper zk = null; public static void main(String[] args) throws Exception { // 获取zk连接 DistributeClient client = new DistributeClient(); client.getConnect(); // 利用zk连接注册服务器信息 client.getServerList(); // 获取服务器列表后客户端继续处理自己原本的业务便可 System.out.println("client is doing its business..."); Thread.sleep(Long.MAX_VALUE); } public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, event -> { System.out.println(event.getType() + "------" + event.getPath()); try { // 更新服务器列表,再次注册事件 getServerList(); } catch (Exception e) { e.printStackTrace(); } }); } public void getServerList() throws KeeperException, InterruptedException { // 获取服务器子节点信息,并监听父节点 List<String> children = zk.getChildren("/servers", true); // 获取节点数据(这里子节点保存的数据是对应主机的主机名),存入局部list中 List<String> servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } serverList = servers; serverList.forEach(System.out::println); } }
- 服务器实现
四. 分布式共享锁
- 在分布式系统中,如何保证同一时刻,只有一个客户端程序能够使用某一共享资源。因为这些线程运行在不同节点上,所以不能使用传统的java同步方式(如synchronized)。在分布式领域,这样的问题,通常可以通过引入第三方解决(这里使用的是Zookeeper,当然也有其他第三方能解决这个问题)
- 程序启动时到zk上注册一个ephemeral_sequential类型的znode,并监听父节点
- 获取父节点下的所有子节点,比较序号的大小
- 序号最小的获得“锁”, 访问资源后删除自己的节点(相当于释放),并重新注册一个新的子节点
- 其他程序收到事件通知,则可以去zk上获取锁。
public class DistributeClientLock { private static final String connectString = "hadoop2:2181,hadoop3:2181,hadoop4:2181"; private static final int sessionTimeout = 2000; private String groupNode = "locks"; private String subNode = "sub"; private ZooKeeper zk = null; // 记录自己创建子节点的路径 private String thisPath = null; public static void main(String[] args) throws Exception { // 建立连接 DistributeClientLock distributeClientLock = new DistributeClientLock(); distributeClientLock.getConnect(); Thread.sleep(Long.MAX_VALUE); } public void getConnect() throws IOException, KeeperException, InterruptedException { zk = new ZooKeeper(connectString, sessionTimeout, event -> { try { // 判断时间类型,只处理子节点变化事件 if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) { // 获取子节点,并对父节点进行监听 List<String> childrenNodes = zk.getChildren("/" + groupNode, true); String thisNode = thisPath.substring(("/" + groupNode + "/").length()); // 比较自己是否是最小id,如果是最小ID获得"锁",访问资源 Collections.sort(childrenNodes); if (childrenNodes.indexOf(thisNode) == 0) { // 访问共享资源并且删除"锁" doSomething(); // 重新注册一把"锁" thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } } catch (Exception e) { e.printStackTrace(); } }); thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 如果当前只有该程序访问资源,则可以直接使用资源 List<String> childrenNodes = zk.getChildren("/" + groupNode, true); if (childrenNodes.size() == 1) { doSomething(); thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } private void doSomething() throws KeeperException, InterruptedException { System.out.println("获得锁: " + thisPath); System.out.println("访问资源ing..."); Thread.sleep(5000); zk.delete(this.thisPath, -1); } }