`
sheungxin
  • 浏览: 103444 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ZooKeeper客户端框架Curator

阅读更多
  • Curator介绍
关于Curator不多介绍,网上很多,可以参考这篇:
引用

  • Curator工具类
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;

import com.zgjky.hec.common.constants.Constants;


/**
 * 客户端连接zookeeper的工具类 通过Curator 客户端框架实现
 * @author sheungxin
 *
 */
public class CuratorClient {
	private CuratorFramework curatorClient;
	private String namespace;
	private String connStr;
	private int retryInterval;
	private int maxRetries;
	private int connTimeoutMs;
	private int sessionTimeoutMs;
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries,int connTimeoutMs,int sessionTimeoutMs){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=connTimeoutMs;
		this.sessionTimeoutMs=sessionTimeoutMs;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String namespace,String connStr,int retryInterval,int maxRetries){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=retryInterval;
		this.maxRetries=maxRetries;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}

	public CuratorClient(String namespace,String connStr){
		this.namespace=namespace;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	public CuratorClient(String connStr){
		this.namespace=Constants.ZK_NAMESPACE;
		this.connStr=connStr;
		this.retryInterval=Constants.ZK_RETRY_INTERVAL;
		this.maxRetries=Constants.ZK_MAX_RETRIES;
		this.connTimeoutMs=Constants.ZK_CONNECTION_TIMEOUT;
		this.sessionTimeoutMs=Constants.ZK_SESSION_TIMEOUT;
		connect(this.namespace, this.connStr, this.retryInterval, this.maxRetries, this.connTimeoutMs, this.sessionTimeoutMs);
	}
	
	/**
	 * 创建到zookeeper的连接
	 * @param connectionString zookeeper服务器连接字符串 格式: ip/域名:port  127.0.0.1:2181
	 * @param retryInterval 重试间隔
	 * @param maxRetries 重试次数
	 * @param connectionTimeoutMs 连接timeout时间
	 * @param sessionTimeoutMs session 失效时间
	 * @return
	 */
    public void connect(String namespace,String connStr, int retryInterval,int maxRetries, int connTimeoutMs,int sessionTimeoutMs){
        ACLProvider aclProvider=new ACLProvider() {
			private List<ACL> aclList;
			
			@Override
			public List<ACL> getDefaultAcl() {
				if(aclList==null){
					aclList=ZooDefs.Ids.CREATOR_ALL_ACL;
					aclList.clear();
					aclList.add(new ACL(Perms.ALL, new Id("auth",Constants.ZK_AUTH)));
				}
				return aclList;
			}
			
			@Override
			public List<ACL> getAclForPath(String arg0) {
				return aclList;
			}
		};
    	curatorClient = CuratorFrameworkFactory.builder()
        	.aclProvider(aclProvider)
        	.authorization("digest", Constants.ZK_AUTH.getBytes())
        	.namespace(namespace)
            .connectString(connStr)
            .retryPolicy(new ExponentialBackoffRetry(retryInterval, maxRetries))
            .connectionTimeoutMs(connTimeoutMs)
            .sessionTimeoutMs(sessionTimeoutMs)
            .build();
    	start();
    }
    
    /**
     * 添加连接状态监听器
     * @param listener
     */
    public void addConnListener(ConnectionStateListener listener){
    	curatorClient.getConnectionStateListenable().addListener(listener);
    }
    
    public void start(){
    	if(curatorClient.getState()!=CuratorFrameworkState.STARTED){
    		curatorClient.start();
    	}
    }
    
    public void disconnect(){
    	CloseableUtils.closeQuietly(curatorClient);
    }
    
    public CuratorFrameworkState getState(){
    	return curatorClient.getState();
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createNodes(String path, byte[] payload) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path, payload);
    }
    
    /**
     * 创建全局可见的持久化节点
     * @param path 节点路径
     * @throws Exception
     */
    public void createNodes(String path) throws Exception{
    	curatorClient.create().creatingParentsIfNeeded().forPath(path);
    }
    
    /**
     * 创建全局可见的临时节点
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void createEphemeral(String path, byte[] payload) throws Exception{    	 
    	curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
    }
    
    /**
     *  创建全局可见的临时节点
     * @param path
     * @throws Exception
     */
    public String createEphemeral(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
    }
    /**
     * 临时顺序节点
     * @param path
     * @return
     * @throws Exception
     */
    public String createEphemeralSeq(String path) throws Exception{    	 
    	return curatorClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    /**
     * 写入节点数据
     * @param path 节点路径
     * @param payload 节点数据
     * @throws Exception
     */
    public void setData(String path, byte[] payload) throws Exception{    	
    	curatorClient.setData().forPath(path,payload);
    }
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public String getData(String path) throws Exception{
    	String result=null;
    	if(isNodeExists(path)){
    		 result=new String(curatorClient.getData().forPath(path),Constants.ZK_CHARSET);
    	}
        return result;
    }  
    
    /**
     * 获取节点数据
     * @param path 节点路径
     * @return
     * @throws Exception
     */
    public List<String> getChildData(String path) throws Exception{
    	List<String> resultList=null;
    	if(isNodeExists(path)){
    		resultList=curatorClient.getChildren().forPath(path);
    	}
        return resultList;
    } 
    
    /**
     * 判断节点是否存在
     * @param path
     * @return
     */
    public boolean isNodeExists(String path){
    	boolean flag=false;
    	try {
    		flag=curatorClient.checkExists().forPath(path)!=null?true:false;
		} catch (Exception e) {
			e.printStackTrace();
		}
    	return flag;
    }
    
    /**
     * 删除指定节点
     * @param path
     * @throws Exception
     */
    public void delNode(String path) throws Exception{
    	curatorClient.delete().guaranteed().inBackground().forPath(path);
    }
    
    /**
     * 监控子节点的变化:添加、修改、删除
     * @param path
     * @param childCacheListener
     * @return 
     * @throws Exception
     */
    public PathChildrenCache pathChildrenCache(String path,PathChildrenCacheListener childCacheListener) throws Exception {
        final PathChildrenCache childCache = new PathChildrenCache(curatorClient, path, true);
        childCache.getListenable().addListener(childCacheListener);
        return childCache;
    }
    
    /**
     * 监控节点的变化:增加、修改
     * @param path
     * @param nodeCacheListener
     * @return
     */
    public NodeCache nodeCache(String path,NodeCacheListener nodeCacheListener) {
        final NodeCache nodeCache = new NodeCache(curatorClient, path);
        nodeCache.getListenable().addListener(nodeCacheListener);
        return nodeCache;
    }
 
    /**
     * 监控节点及子节点的变化:增加、修改、删除
     * @param path
     * @param treeCacheListener
     * @return
     */
    public TreeCache nodeCache(String path,TreeCacheListener treeCacheListener) {
        final TreeCache treeCache = new TreeCache(curatorClient, path);
        treeCache.getListenable().addListener(treeCacheListener);
        return treeCache;
    }
    
    public void addCuratorListener(CuratorListener curatorListener){
    	curatorClient.getCuratorListenable().addListener(curatorListener);
    }
    
}

1
0
分享到:
评论

相关推荐

    zookeeper开源客户端Curator

    Curator是Netflix公司开源的一套ZooKeeper客户端框架,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,实现了Fluent风格的API接口,目前已经...

    Zookeeper开源客户端框架Curator简介与示例

    NULL 博文链接:https://m635674608.iteye.com/blog/2232760

    浅谈Zookeeper开源客户端框架Curator

    主要介绍了浅谈Zookeeper开源客户端框架Curator的相关内容,具有一定参考价值,需要的朋友可以了解下。

    深入探索Zookeeper:从客户端使用到集群特性的全面指南

    本文深入探讨了Zookeeper的关键应用和集群特性,涉及官方客户端的使用、Apache Curator客户端框架的应用,以及Zookeeper集群的不停机动态扩容和缩容。通过实际代码示例,详细说明了Zookeeper客户端实例的创建、节点...

    一文彻底理解ZooKeeper分布式锁的实现原理

    本文基于比较常用的Curator这个开源框架,聊一下这个框架对ZooKeeper(以下简称zk)分布式锁的实现。 一般除了大公司是自行封装分布式锁框架之外,建议大家用这些开源框架封装好的分布式锁实现,这是一个比较快捷省...

    当当网开源的分布式作业调度组件 Elastic-Job.zip

    该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。 ddframe其他模块也有可独立开源的部分,之前当当曾开源过dd-soa的基石模块DubboX。elastic-job和ddframe关系见下图Elastic-Job 主要...

    java定时任务框架elasticjob详解

    主要介绍了java定时任务框架elasticjob详解,Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业...该项目基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。,需要的朋友可以参考下

    ElasticJob分布式调度解决方案.rar

    作业注册中心: 基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。 作业分片: 将一个任务分片成为多个小任务项在多服务器上同时执行。 弹性扩容缩容: 运行中的...

    finagle-thriftmux_2.9.2-6.4.1.zip

    curator.zip,由netflixzookeeper客户端包装和富zookeeper框架开发的馆长

    策展人:Apache Curator

    Apache Curator包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。 它还包括针对常见用例和扩展的配方,例如服务发现和Java 8异步DSL。 更多细节: Apache Curator网站: : Maven ...

    curator

    Apache Curator是用于Apache ZooKeeper(一种分布式协调服务)的Java / JVM客户端库。 它包括一个高级API框架和实用程序,使使用Apache ZooKeeper变得更加轻松和可靠。 它还包括常见用例和扩展的配方,例如服务发现...

    java_learning_practice:java进阶之路:面试高频算法,akka,多线程,NIO,Netty,SpringBoot,Spark && Flink等

    多线程等框架:akka,zookeeper,Disruptor等核心网springboot相关练习球衣封装Spring安全练习基于curator fremework的分布式锁封装hbase封装javapoet尝试elasticserach封装基于德鲁伊池的蜂巢jdbc客户端池封装;...

Global site tag (gtag.js) - Google Analytics