关于Curator不多介绍,网上很多,可以参考这篇:
引用
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import com.zgjky.hec.common.constants.Constants;
/**
* 客户端连接zookeeper的工具类 通过Curator 客户端框架实现
* @author sheungxin
*
*/
public class CuratorClient {
private CuratorFramework curatorClient;
private String namespace;
private String connStr;
private int retryInterval;
private int maxRetries;
private int connTimeoutMs;
private int sessionTimeoutMs;
public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=retryInterval;
this.maxRetries=maxRetries;
this.connTimeoutMs=connTimeoutMs;
this.sessionTimeoutMs=sessionTimeoutMs;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=retryInterval;
this.maxRetries=maxRetries;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String namespace,String connStr){
this.namespace=namespace;
this.connStr=connStr;
this.retryInterval=Constants.ZK_RETRY_INTERVAL;
this.maxRetries=Constants.ZK_MAX_RETRIES;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
public CuratorClient(String connStr){
this.namespace=Constants.ZK_NAMESPACE;
this.connStr=connStr;
this.retryInterval=Constants.ZK_RETRY_INTERVAL;
this.maxRetries=Constants.ZK_MAX_RETRIES;
this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
}
/**
* 创建到zookeeper的连接
* @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port 127.0.0.1:2181
* @param retryInterval 重试间隔
* @param maxRetries 重试次数
* @param connectionTimeoutMs 连接timeout时间
* @param sessionTimeoutMs session 失效时间
* @return
*/
public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){
ACLProvider aclProvider=new ACLProvider() {
private List<ACL> aclList;
@Override
public List<ACL> getDefaultAcl() {
if(aclList==null){
aclList=ZooDefs.Ids.CREATOR_ALL_ACL;
aclList.clear();
aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH)));
}
return aclList;
}
@Override
public List<ACL> getAclForPath(String arg0) {
return aclList;
}
};
curatorClient = CuratorFrameworkFactory.builder()
.aclProvider(aclProvider)
.authorization("digest", Constants.ZK_AUTH.getBytes())
.namespace(namespace)
.connectString(connStr)
.retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries))
.connectionTimeoutMs(connTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.build();
start();
}
/**
* 添加连接状态监听器
* @param listener
*/
public void addConnListener(ConnectionStateListener listener){
curatorClient.getConnectionStateListenable().addListener(listener);
}
public void start(){
if(curatorClient.getState()!=CuratorFrameworkState.STARTED){
curatorClient.start();
}
}
public void disconnect(){
CloseableUtils.closeQuietly(curatorClient);
}
public CuratorFrameworkState getState(){
return curatorClient.getState();
}
/**
* 创建全局可见的持久化节点
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void createNodes(String path, byte[] payload) throws Exception{
curatorClient.create().creatingParentsIfNeeded().forPath(path, payload);
}
/**
* 创建全局可见的持久化节点
* @param path 节点路径
* @throws Exception
*/
public void createNodes(String path) throws Exception{
curatorClient.create().creatingParentsIfNeeded().forPath(path);
}
/**
* 创建全局可见的临时节点
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void createEphemeral(String path, byte[] payload) throws Exception{
curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
}
/**
* 创建全局可见的临时节点
* @param path
* @throws Exception
*/
public String createEphemeral(String path) throws Exception{
return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
}
/**
* 临时顺序节点
* @param path
* @return
* @throws Exception
*/
public String createEphemeralSeq(String path) throws Exception{
return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
/**
* 写入节点数据
* @param path 节点路径
* @param payload 节点数据
* @throws Exception
*/
public void setData(String path, byte[] payload) throws Exception{
curatorClient.setData().forPath(path,payload);
}
/**
* 获取节点数据
* @param path 节点路径
* @return
* @throws Exception
*/
public String getData(String path) throws Exception{
String result=null;
if(isNodeExists(path)){
result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET);
}
return result;
}
/**
* 获取节点数据
* @param path 节点路径
* @return
* @throws Exception
*/
public List<String> getChildData(String path) throws Exception{
List<String> resultList=null;
if(isNodeExists(path)){
resultList=curatorClient.getChildren().forPath(path);
}
return resultList;
}
/**
* 判断节点是否存在
* @param path
* @return
*/
public boolean isNodeExists(String path){
boolean flag=false;
try {
flag=curatorClient.checkExists().forPath(path)!=null?true:false;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* 删除指定节点
* @param path
* @throws Exception
*/
public void delNode(String path) throws Exception{
curatorClient.delete().guaranteed().inBackground().forPath(path);
}
/**
* 监控子节点的变化:添加、修改、删除
* @param path
* @param childCacheListener
* @return
* @throws Exception
*/
public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception {
final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true);
childCache.getListenable().addListener(childCacheListener);
return childCache;
}
/**
* 监控节点的变化:增加、修改
* @param path
* @param nodeCacheListener
* @return
*/
public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) {
final NodeCache nodeCache = new NodeCache(curatorClient, path);
nodeCache.getListenable().addListener(nodeCacheListener);
return nodeCache;
}
/**
* 监控节点及子节点的变化:增加、修改、删除
* @param path
* @param treeCacheListener
* @return
*/
public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) {
final TreeCache treeCache = new TreeCache(curatorClient, path);
treeCache.getListenable().addListener(treeCacheListener);
return treeCache;
}
public void addCuratorListener(CuratorListener curatorListener){
curatorClient.getCuratorListenable().addListener(curatorListener);
}
}
分享到:
相关推荐
Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...
NULL 博文链接:https://m635674608.iteye.com/blog/2232760
主要介绍了浅谈Zookeeper开源客户端框架Curator的相关内容,具有一定参考价值,需要的朋友可以了解下。
本文深入探讨了Zookeeper的关键应用和集群特性,涉及官方客户端的使用、Apache Curator客户端框架的应用,以及Zookeeper集群的不停机动态扩容和缩容。通过实际代码示例,详细说明了Zookeeper客户端实例的创建、节点...
本文基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现。 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省...
该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。 ddframe其他模块也有可独立开源的部分,之前当当曾开源过dd-soa的基石模块DubboX。elastic-job和ddframe关系见下图Elastic-Job 主要...
主要介绍了java定时任务框架elasticjob详解,Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业...该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。,需要的朋友可以参考下
作业注册中心: 基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。 作业分片: 将一个任务分片成为多个小任务项在多服务器上同时执行。 弹性扩容缩容: 运行中的...
curator.zip,由netflixzookeeper客户端包装和富zookeeper框架开发的馆长
Apache Curator包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。 它还包括针对常见用例和扩展的配方,例如服务发现和Java 8异步DSL。 更多细节: Apache Curator网站: : Maven ...
Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。 它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。 它还包括常见用例和扩展的配方,例如服务发现...
多线程等框架:akka,zookeeper,Disruptor等核心网springboot相关练习球衣封装Spring安全练习基于curator fremework的分布式锁封装hbase封装javapoet尝试elasticserach封装基于德鲁伊池的蜂巢jdbc客户端池封装;...