Zookeeper是一个开源的分布式协调服务。其设计目标是将那些复杂的容易出错的分布式一致性服务封装起来,以简单的接口提供给用户使用。它是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现如:发布/订阅、负载均衡、集群管理、分布式锁、分布式队列等功能。
名词概念
Zookeeper集群中有三种角色:Leader、Follower、Observer。Leader提供读和写服务。Follower和Observer能提供读服务。Observer和Follower的区别就在于Ovserver不参与Leader选举,不参与写操作的过半成功策略。
因此Observer可以在不影响写性能的情况下,提升集群的性能。如果没有Observer的话,一个Leader和N个Follower,如果Follower特别多的话,虽然读性能提高了,但是写性能和选举的性能会受影响。
客户端会话,一个客户端连接是指客户端和服务端之间的一个TCP长连接。客户端启动的时候,首先会和服务器建立一个TCP长连接,从第一次建立连接开始,会话(session)的生命周期就开始了。通过这个连接,客户端能够通过心跳检测与服务器保持有效的会话,也能够向Zookeeper服务器发送请求,同时还能接受服务器的Watch事件通知。
Zookeeper数据模型中的一个单元,我们称之为数据节点。Zookeeper将所有数据存储在内存中,数据模型是一棵树,由斜杠进行分割的路径,就是一个Znode。如/app。每个Znode都能保存自己的数据内容,还会保存属性信息。
4. 版本
每个Znode都有一个叫作Stat的数据结构,Stat里记录了Znode的三个数据版本,分别是version(当前Znode的版本)、cversion(当前Znode子节点的版本)、aversion(当前Znode的ACL版本)
5. Watcher(事件监听器)
Watcher是Zookeeper中一个很重要的特性,Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端,该机制是Zookeeper实现分布式协调服务的重要特性。
6. ACL
Zookeeper采用ACL策略来进行权限控制,它定义了五种权限:
注意:CREATE和DELETE这两种权限是针对子节点的权限控制
./zkCli.sh #连接本地的zookeeper服务器 ./zkCli.sh -server ip:port # 连接远程的服务器连接成功之后,系统会输出Zookeeper的相关环境及配置信息等信息
create [-s] [-e] [-c] [-t ttl] path [data] [acl] #创建顺序节点 create -s /cc #创建临时节点,客户端会话结束后,节点会自动删除 create -e /temp #创建带内容的节点 create /hi nihao#ls命令会列出path节点下所有的子节点ls path#get命令会查询到path节点的数据内容,加上-s可以查询更详细的信息get [-s] pathset [-s] [-v version] path data#修改/abc节点的内容为hello,加上-s会返回更详细的信息set /abc hello#删除,如果带上了版本参数,那么删除的时候就会校验版本是否正确,正确才进行删除delete [-v version] path<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version></dependency>很少直接使用了,接口介绍省略
ZkClient是github上一个开源的zookeeper客户端,在原生API的基础上进行了包装,更加易用。同时还实现了如Session超时重连、Watcher反复注册等功能。
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version></dependency>直接上代码:
public class ClientDemo { private static final String connectStr="192.168.56.115:2181"; public static void main(String[] args) throws InterruptedException {// create();// delete();// get(); getData(); } public static void create() throws InterruptedException { ZkClient zkClient=new ZkClient(connectStr); //第二个参数,true代表递归创建节点,没有父节点先创建父节点 zkClient.createPersistent("/test/node01",true); //持久有序的node zkClient.createPersistentSequential("/test/node02","data"); //临时node zkClient.createEphemeral("/ephemeral"); } public static void delete(){ ZkClient zkClient=new ZkClient(connectStr); //普通删除 zkClient.delete("/test"); //遍历删除,先删除该节点的所有子节点,再删除它本身 zkClient.deleteRecursive("/test"); } public static void get() throws InterruptedException { ZkClient zkClient=new ZkClient(connectStr); List<String> children = zkClient.getChildren("/test"); System.out.println("子节点:"+children); zkClient.subscribeChildChanges("/watch", new IZkChildListener() { @Override public void handleChildChange(String s, List<String> list) throws Exception { System.out.println(s + " child changed,list:" + list); } }); zkClient.createPersistent("/watch"); Thread.sleep(1000); zkClient.createPersistent("/watch/test"); Thread.sleep(1000); zkClient.delete("/watch/test"); Thread.sleep(1000); zkClient.delete("/watch"); } //获取数据 public static void getData() throws InterruptedException { ZkClient zkClient=new ZkClient(connectStr); String path="/abc"; boolean exists = zkClient.exists(path); System.out.println("节点是否存在:"+exists); if(!exists){ zkClient.createEphemeral(path,"123"); } //数据改变监听 zkClient.subscribeDataChanges(path, new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { System.out.println("节点:"+s+" 数据被改变:"+o); } @Override public void handleDataDeleted(String s) throws Exception { System.out.println("节点被删除:"+s); } }); //读取数据 Object o = zkClient.readData(path); System.out.println("读取节点的数据:"+o); Thread.sleep(3000); //更新数据 Stat stat = zkClient.writeData(path, "456"); Thread.sleep(3000); //删除数据 zkClient.delete(path); Thread.sleep(3000); }}Curator是Netflix公司开源的客户端框架。它实现了连接重连、Watcher反复注册、重试策略和NodeExistsException异常解决等等。
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version></dependency>直接上代码:
public class CuratorDemo { private static final String connectStr="192.168.56.115:2181"; private static String path ="/abc"; public static void main(String[] args) throws Exception {// connect();// create();// delete();// get(); update(); } public static void connect(){ //连接方式1 /* 构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) baseSleepTimeMs:初始的sleep时间,?于计算之后的每次重试的sleep时间, 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1))) maxRetries:最?重试次数 maxSleepMs:最?sleep时间,如果上述的当前sleep计算出来?这个?,那么sleep? 这个时间,默认的最?时间是Integer.MAX_VALUE毫秒。 */ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 3000, retryPolicy); client.start(); //连接方式2 CuratorFramework Client = CuratorFrameworkFactory.builder() .connectString("server1:2181,server2:2181,server3:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(retryPolicy) .build(); client.start(); } public static void create() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy); client.start(); Thread.sleep(2000); //创建一个内容为空的节点,curator默认是创建持久节点// client.create().forPath(path); //创建一个有内容的节点 client.create().forPath(path,"123".getBytes()); //调用creatingParentsIfNeeded 接口,Curator 就能够自动地递归创建所有需要的父节点 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/tt"); } public static void delete() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy); client.start(); //删除节点 client.delete().forPath(path); //删除节点,并递归删除子节点 client.delete().deletingChildrenIfNeeded().forPath(path); //指定版本删除ls client.delete().withVersion(1).forPath(path); //强制删除。只要客户端会话有效,那么Curator会在后台持续进?删除操作,直到节点删除成功。?如遇到?些? //络异常的情况,此guaranteed的强制删除就会很有效果 client.delete().guaranteed().forPath(path); } public static void get() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy); client.start(); //普通查询 byte[] bytes = client.getData().forPath(path); System.out.println("节点内容:"+new String(bytes)); // 包含状态查询 Stat stat = new Stat(); byte[] bytes1 = client.getData().storingStatIn(stat).forPath(path); System.out.println(stat.getVersion()); } public static void update() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, 5000, 30000, retryPolicy); client.start(); // 普通更新 client.setData().forPath(path,"新内容".getBytes()); // 指定版本更新 当携带数据版本不?致时,无法完成更新操作 // 异常信息:Exception in thread "main" org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /abc client.setData().withVersion(1).forPath(path,"abcd".getBytes()); }