简介
"Guava is to Java what Curator is to ZooKeeper" --Patrick Hunt, ZooKeeper committer
这句话是Curator核心代码提交者对Curator形象而生动的定义。
总所周知,zk有原生的api包,那么我们为什么还要使用Curator呢?
-
zk原生api弊端
- 超时不能自动重连
- Watch注册一次后失效
- 不能递归创建节点等
- 其他坑...
-
Curator优势
- 超时重连
- watcher一次不失效
- 简易api和fluent编程风格
- 丰富的zk工具类及解决方案
因此尽量不要直接使用zk原生客户端。Curator都帮你把坑填好了,又何必费时间去挖坑呢。
样例
下面封装了Curator的常用方法,并给出了使用demo。
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
/**
* Curator操作zk客户端
*
* @author guangcai.ji
* @create 2019-09-12
*/
public class ZkClient {
private CuratorFramework client;
private static final int sessionTimeout = 10000;
private static final String zkServerPath = "172.16.0.2:2181,172.16.0.3:2181,172.16.0.4:2181";
/**
* 获取客户端
*
* @return
* @throws Exception
*/
public CuratorFramework obtainClient() throws Exception {
synchronized (ZkClient.class) {
if (client != null) {
return client;
}
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(100, 10, 5000))
.build();
client.start();
client.blockUntilConnected();
return client;
}
}
/**
* 保存(创建或修改)临时数据
*
* @param path zk路径
* @param data 临时数据,失联时自动删除数据
* @throws Exception
*/
public void saveTemporaryData(String path, byte[] data) throws Exception {
CuratorFramework curatorFramework = obtainClient();
Stat stat = curatorFramework.checkExists().forPath(path);
if (stat != null) {
curatorFramework.setData().forPath(path, data);
} else {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data);
}
}
/**
* 保存(创建或修改)数据
*
* @param path zk路径
* @param data 要保存的数据
* @throws Exception
*/
public void saveData(String path, byte[] data) throws Exception {
CuratorFramework curatorFramework = obtainClient();
Stat stat = curatorFramework.checkExists().forPath(path);
if (stat != null) {
curatorFramework.setData().forPath(path, data);
} else {
curatorFramework.create().creatingParentsIfNeeded().forPath(path, data);
}
}
/**
* 删除数据(包含其路径下的子节点)
*
* @param path zk路径
* @throws Exception
*/
public void deleteData(String path) throws Exception {
obtainClient().delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
}
/**
* 查询数据
*
* @param path zk路径
* @throws Exception
*/
public String getData(String path) throws Exception {
byte[] bytes = obtainClient().getData().forPath(path);
return new String(bytes, StandardCharsets.UTF_8);
}
/**
* 监控某一节点数据变化
* <p>
* 如果节点被删除,则无法触发监听
*
* @param path zk路径
* @throws Exception
*/
public void nodeListener(String path) throws Exception {
final NodeCache nodeCache = new NodeCache(obtainClient(), path, false);
nodeCache.getListenable().addListener(
() -> {
if (nodeCache.getCurrentData() != null) {
System.out.println("\n .nodeCache------节点数据发生了改变,发生的路径为:" + nodeCache.getCurrentData().getPath() + ",节点数据发生了改变 ,新的数据为:" + new String(nodeCache.getCurrentData().getData()) + "\n");
}
}
);
nodeCache.start(true);
}
/**
* 监控某一节点数据变化
* <p>
* 如果节点被删除,则无法触发监听
*
* @param path zk路径
* @throws Exception
*/
public void pathChildrenListener(String path) throws Exception {
final PathChildrenCache pathChildrenCache = new PathChildrenCache(obtainClient(), path, true);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("\n 添加节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
break;
case CHILD_UPDATED:
System.out.println("\n 更新节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
break;
case CHILD_REMOVED:
System.out.println("\n 删除节点 path = " + event.getData().getPath() + " data = " + new String(event.getData().getData(), StandardCharsets.UTF_8) + "\n");
break;
default:
break;
}
}
);
// * POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
// * NORMAL:异步初始化
// * BUILD_INITIAL_CACHE:同步初始化
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
}
public static void main(String[] args) throws Exception {
ZkClient zkClient = new ZkClient();
CountDownLatch countDownLatch = new CountDownLatch(1);
zkClient.saveTemporaryData("/test/temporary", "temp".getBytes());
zkClient.nodeListener("/test/temporary");
zkClient.pathChildrenListener("/test/temporary");
zkClient.saveData("/test/temporary", "temp1".getBytes());
String data = zkClient.getData("/test/temporary");
System.out.println("getData res = " + data);
zkClient.saveTemporaryData("/test/temporary", "temp3".getBytes());
//阻塞
countDownLatch.await();
}
}
经典使用场景
- 数据发布/订阅
- 分布式协调/通知
- 分布式锁
- ...
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名,转载请标明出处
最后编辑时间为: