hadoop组件02-zookeeper
1.zookeeper的介绍
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。分布式应用可以基于它实现更高级的服务,实现诸如同步服务、配置维护和集群管理或者命名的服务。
1.1 首先我们介绍一下集群的存在模式
1.主从集群
其中主节点的角色就是主,主节点消失了也只能从其他主节点中选出来,但是从节点就不能存在任何的选举可能
2.非主从集群
而在主从集群中就会出现单节点故障【比如皇帝挂了】
所以非主从集群中要存在两个主节点做热备份
1.2 zookeeper的作用
但是zookeeper为了让自身集群稳定自身也是集群形式的
1.3 zookeeper的集群形式
作为一个集群给其他集群做协调服务那么首先要保证自身也是稳定性较高的,自身的存在也是集群形式的
作为一个集群也是存在主点的,因为需要保证自身集群的可用和管理
那么真正的集群中zookeeper的服务节点的身份如下
1.4 zookeeper中的角色
>> 领导者(leader),负责进行投票的发起和决议,更新系统状态(数据同步),发送心跳。
>> 学习者(learner),包括跟随者(follower)和观察者(observer)。
>> 跟随者(follower),用于接受客户端请求、向客户端返回结果,在选主过程中参与投票。
>> 观察者(Observer),可以接受客户端请求,将写请求转发给leader,但observer不参加投票过程,只同步leader的状态,observer的目的是为了扩展系统,提高读取速度。
如果Zookeeper集群的读取负载很高,或者客户端多到跨机房,可以设置一些observer服务器,以提高读取的吞吐量。
需要注意的是:
1)Leader和Follower构成Zookeeper集群的法定人数,也就是说,只有他们才能参与新Leader的选举以及响应Leader的提议。
2)observer不属于法定人数,即不参加选举也不响应提议。
1)leader失效后会在follower中重新选举新的leader
2)每个follower都和leader有连接,接受leader的数据更新操作
3)客户端可以连接到每个server,每个server的数据完全相同
4)每个节点的服务Server,记录事务日志和快照到持久存储
2.zookeeper的搭建
2.1 规划
nn1、nn2、s1。
其中:一个是leader,剩余的是Follower
2.2 制作 zookeeper 机器批量分布脚本
用之前的多机脚本拷贝一套zookeeper 多机操作脚本,然后修改成 zookeeper 用的多机操作脚本。
1)拷贝并修改带有ip的文件
#生成新的ip文件
cp ips ips_zk
#修改ips_zk文件 的主机名称只留下nn1、nn2、s1三台。
2)拷贝脚本
#复制三个zk的脚本
cp scp_all.sh scp_all_zk.sh
cp ssh_all.sh ssh_all_zk.sh
cp ssh_root.sh ssh_root_zk.sh
3)修改脚本
#修改三个脚本的ip信息
vim scp_all_zk.sh
vim ssh_all_zk.sh
vim ssh_root_zk.sh
#修改其中的ips变为ips_zk
2.3 找到zookeeper的安装包
#找到每个安装zookeeper机器上的安装包
/public/software/bigdata/zookeeper-3.4.8.tar.gz
2.4 在所有机器上把zookeeper的tar包解压到/usr/local目录下
ssh_root_zk.sh tar -zxvf /public/software/bigdata/zookeeper-3.4.8.tar.gz -C /usr/local/
2.5 创建软链接
#在每台机器上给/usr/local/zookeeper-3.4.8创建zookeeper软链接
ssh_root_zk.sh ln -s /usr/local/zookeeper-3.4.8 /usr/local/zookeeper
#修改zookeeper的归属权限是hadoop用户
ssh_root_zk.sh chown hadoop:hadoop -R /usr/local/zookeeper-3.4.8
2.6 修改每个机器上的zookeeper 配置
2.6.1 zookeeper脚本所在目录——bin目录
2.6.2 zookeeper配置文件目录——conf目录
zookeeper的解压文件包含以下把内容
conf:配置文件包 bin:命令指定脚本 lib:依赖支持包
配置文件中的信息如下,我们需要修改的是zoo.cfg
1)常用配置文件 conf/zoo.cfg,常用配置说明及示例
配置示例:
4)修改配置文件
#修改配置文件名称
mv /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
vim /usr/local/zookeeper/conf/zoo.cfg
#增加图上内容
server.1=nn1:2888:3888
server.2=nn1:2888:3888
server.3=nn1:2888:3888
#其中1 2 3分别代表的节点编号 2888节点之间的通信端口 3888节点间的选举端口
dataDir=/data
#zookeeper运行时候的数据存储位置
5)将zoo.cfg 配置批量拷贝到每台机器
#将zoo.cfg 配置文件批量拷贝到每台机器上
scp_all_zk.sh /usr/local/zookeeper/conf/zoo.cfg /usr/local/zookeeper/conf/
#检查是否拷贝成功
ssh_all_zk.sh ls /usr/local/zookeeper/conf/zoo.cfg
2.6.3 修改输出日志配置文件所在目录并分发配置
修改zookeeper/bin/zkEnv.sh 脚本,在脚本中给 ZOO_LOG_DIR 设置日志所在目录
修改后
#拷贝zkEnv.sh到每台机器的zookeeper的bin目录下
scp_all_zk.sh /usr/local/zookeeper/bin/zkEnv.sh /usr/local/zookeeper/bin/
2.7 在每个机器上创建/data目录
#5个机器一起创建 /data目录,因为以后安装hadoop的时候也使用
ssh_root.sh mkdir /data
#修改五个机器的data目录是hadoop权限
ssh_root.sh chown hadoop:hadoop -R /data
#批量验证
ssh_root.sh "ls -l / | grep data"
2.8 在新建的/data目录下生成myid文件(这个只能手动,不能使用批量脚本)
#在每个机器的/data目录下创建myid文件
ssh_all_zk.sh touch /data/myid
每台机器都需要单独执行
#第一台:
echo "1" > /data/myid
#第二台:
echo "2" > /data/myid
#第三台:
echo "3" > /data/myid
#查看
ssh_all_zk.sh cat /data/myid
补充:
以下是实验室的机器文件系统目
其中链接形式展示的目录都是挂载在海牛云盘上面的,所以存储数据在这里面机器的启动速度会比较快因为直接从云盘加载,本地不用装载数据,但是这个目录中的数据使用起来需要将云盘的数据放入到自己的本地所以比较慢,小伙伴们在使用的时候要注意!!!!
2.9 给5个机器设置好环境变量
env 查看当前shell环境下已有环境变量
1./etc/profile整个系统的
2.\~/.bash_profile当前用户的。
加载顺序是先加载系统,再加载自己的。
#在nn1 nn2 s1的机器上面执行以下操作
echo 'export ZOOKEEPER_HOME=/usr/local/zookeeper' >> /etc/profile
echo 'export PATH=$PATH:$ZOOKEEPER_HOME/bin' >> /etc/profile
加到/etc/profile文件的底部,并且确认保存,注意使用root用户操作,再切换一个用户或者退出就会重新加载刚才的设置,再加env验证一下是否配置正确
#批量分发
scp_all_zk.sh /etc/profile /tmp
#曲线救国将数据从tmp目录提取出来放入到/etc目录下
ssh_root_zk.sh mv /tmp/profile /etc/profile
#批量验证
ssh_root_zk.sh tail /etc/profile
#批量source
ssh_root_zk.sh source /etc/profile
查看每个机器的JAVA 版本
ssh_all.sh java -version
2.10 在每个机器上启动zookeeper服务并查看启动结果
2.10.1 在每个机器上启动zookeeper服务
#批量启动
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh start
#批量查看java进程用jps
ssh_all_zk.sh jps
#查询各个zookeeper节点的状态
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh status
查看状态 status
补充问题:前台执行和后台执行
在linux或者是window电脑中执行一个程序可以是前台执行也可以是后台执行的,前台就像是我们玩游戏一样,后台就像是支撑系统运行的很多应用程序一样,我们虽然看不见但是却在后台默默执行着
对于zookeeper服务而言他的执行就是后台执行
那么在linux中怎么实现一个 程序的后台执行呢????
这要我们使用两个命令分别是nohup命令和&,其中nohup是后台一直执行,即使用户账户退出也可以一直执行,而&是让程序在后台执行
首先我们生命定义一个执行脚本
vim ~/f1.sh
#输入以下内容
#! /bin/bash
name='hainiu'
echo $name
sleep 20
echo 'end'
#增加执行权限
chmod +x ~/f1.sh
#执行脚本命令
~/f1.sh
程序前台执行处于卡死状态
#使用nohup执行
nohup ~/f1.sh &
发现程序处于后台执行过程中
默认执行输出文件内容位置nohup.out中
#所以程序运行我们要选择文件执行日志输出位置
#linux进程中2代表是错误输出结果 1代表是正常输出结果 2>&1
nohup ~/f1.sh >>test.log 2>&1 &
#后台执行并且输出到test.log中
#其中输出日志结果可以写出到 /dev/null中,这个位置是系统黑洞,什么都会消失
#所以如果不看重日志那么可以输出到这个位置
nohup ~/f1.sh >> /dev/null 2>&1 &
2.10.2 查看ZK输出日志和进程信息
#日志输出文件
/data/zookeeper.out
由于ZooKeeper集群启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以上面日志前面部分的异常是可以忽略的。通过后面部分可以看到,集群在选出一个Leader后,最后稳定了。
其他结点可能也出现类似问题,属于正常。
2.10.3 怎么查看系统进程输入日志文件位置
通过 JPS 查看进程ID
去查看进程ID文件,再到FD目录就能查看到当前进程所使用的管道信息
通过进程id查看linux服务运行日志
在此文件夹中可以看到所有的进程使用文件和连接信息,其中2代表错误日志输出位置 1代表正确日志输出位置
3.zookeeper的初始化选举和数据模型
zookeeper启动完毕以后我们可以查看节点的状态信息
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh status
那么初始化集群的时候zookeeper是如何选举的呢?
ZooKeeper本质上是一个分布式的小文件存储系统
Zookeeper表现为一个分层的文件系统目录树结构(不同于文件系统的是,节点可以有自己的数据,而文件系统中的目录节点只有子节点),每个节点可以存少量的数据(1M左右)
每个节点称做一个ZNode。每个ZNode都可以通过其路径唯一标识
4.zookeeper的操作
4.1连接zookeeper
#zookeeper客户端的链接命令
zkCli.sh -server nn1:2181,nn2:2181,s1:2181
下面出现connected代表链接成功
#随便滚键盘就可以提示zookeeper中的命令
4.2 zookeeper中常用的命令
stat 查看节点状态信息
set 设置节点的值
ls 查看子节点
ls2 查看子节点并且查询状态信息
delete 软删除节点如果存在子节点不可以删除
sync 同步节点数据信息
rmr 强制删除节点
get 获取节点的值
create 创建节点
quit 退出客户端
close 关闭客户端和服务端的链接
connect 链接服务端
4.3 节点操作和原理
#创建节点
#create [-s序列节点] [-e临时节点] path data
zookeeper中的节点类型分为四种
#持久节点是创建完毕的节点在zookeeper中一直存在的
#持久无序列节点
create /node1 hainiu
#出现的结果就是 /node1
#持久带顺序的节点
create -s /node1 hainiu
#出现结果 /node10000000001 会自动在节点后面增加序列号
#临时节点,在客户端连接的时候使用的时候会一直存在,但是客户端退出的时候节点会自动消失
create -e /tmp tmp
#临时带有序列的节点
create -e -s /tmp tmp
#结果是/tmp0000000251自带事物编号
创建的节点在客户端断开的时候就要断开了,并且临时节点下面不能创建子节点
#close关闭客户端和服务端的链接
close
#重新链接 connect ip:port
connect nn1:2181
#查看根节点的节点信息 ls /
lS /
#可以看到临时节点全部都消失了
#quit可以直接退出客户端
quit
# close是关闭客户端和服务端的链接但是不会退出客户端
#set设置节点的值
set path value
#get获取节点的值
get path
在设置值和获取值的时候我们可以看到节点中的详情信息
这里我们引申出来一个新的概念:zxid(事物id)
zxid相当于是mysql等数据库中的事物,每次对于zookeeper中的数据或者是节点做更改操作的时候都会存在一个zxid进行记录其中的数据的版本,版本永远递增不会减小,在zookeeper中所有节点的数据版本都应该保持一致,所以所有人根据事物id进行数据同步,保证一致性,而上面的所有描述信息就是zxid的信息
在一个集群中的所有节点的数据信息都保持一致的,我们可以连接不同的节点读取刚才设置的数据的值
#connect nn1:2181
#connect nn2:2181
#connect s1:2181
#分别执行 get /node1
get /node1
每个节点的值都保持一致,并且他们的事物zxid是一致的
#sync保证数据同步一致
sync path
#在读取数据之前,防止因为网络或者卡顿等问题出现数据不一致的问题,我们可以手动同步数据
#返回值1代表数据有同步,0代表是数据已经一致不需要同步数据
#delete删除节点,但是存在子节点是无法删除的
delete path
# rmr强制删除节点,有子节点也可以删除
rmr path
#先创建/node1.并且在下面创建子节点
create /node1 node1
create /node1/child1 child1
create /node1/child2 child2
#ls查询子节点
ls path
#ls2 查询子节点并且返回当前节点的状态
ls2 path
#stat查询当前节点的信息
stat path
4.4 paxos协议和zab协议
上面的操作内容可以保证数据在多个机器上保持一致,那么数据是如何保持一致的呢?
paxos协议的原理
Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景是,在一个分布式数据库系统中,如果各节点的初始状态一致,每个节点执行相同的操作序列,那么他们最后能得到一个一致的状态。为保证每个节点执行相同的命令序列,需要在每一条指令上执行一个“一致性算法”以保证每个节点看到的指令一致。一个通用的一致性算法可以应用在许多场景中,是分布式计算中的重要问题。因此从20世纪80年代起对于一致性算法的研究就没有停止过。节点通信存在两种模型:共享内存和消息传递(Messages passing)。Paxos 算法就是一种基于消息传递模型的一致性算法。
paxos协议分为以下几个阶段
1.提议者提出提议
2.接收者负责恢复确认消息
3.当提议者接收到超过一半以上的接收者的同意以后发送commit消息,确认消息执行
4.节点将修改完毕的数据同步给其他的节点
并且因为提议者过多产生的提议执行逻辑也比较多
zab协议
1. Zab协议是为分布式协调服务 Zookeeper专门设计的,是 Zookeeper保证数据一致性的核心算法。Zab借鉴了 Paxos算法,但又不像 Paxos那样,是一种通用的分布式一致性算法。支持崩溃恢复 和 原子广播协议。
2. 在 Zookeeper中主要依赖 Zab协议来实现数据一致性,基于该协议,zk实现了一种主备模型(即 Leader和 Follower模型)的系统架构来保证集群中各副本之间数据的一致性。主备系统架构模型指只有一台客户端(Leader)负责处理外部的写事务请求,然后 Leader客户端将数据同步到其他 Follower节点。
总结来说,zab协议就是paxos协议的改版,存在主从模式,整个集群的操作全部从主节点出发,而不是多提议者
保证数据的完整一致性
4.5 zookeeper的读写数据流程
1)写操作
写入数据的指定leader写入
写入数据请求follower的情况
2)读操作
1)在Client向Follwer 或 Observer 发出一个读的请求;
2)Follwer 或 Observer 把请求结果返回给Client;
4.6 zookeeper的监听机制
zookeeper的客户端可以实现对于某个节点的变化进行监听的功能
#能够提供监听功能的命令有
stat path [watch]监控节点状态变化
get path [watch] 监听值的变化
ls2 path [watch]监控节点状态变化和子节点变化
ls path [watch] 监控子节点变化
#监控/servers子节点变化
ls /servers watch
#另一台执行 create /servers/tmp tmp
create /servers/tmp tmp
发现节点变化,出现监听数据通知
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/servers
#监控/servers子节点变化
ls2 /servers watch
#另一台执行 create /servers/tmp tmp
create /servers/tmp1 tmp1
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/servers
#监控/servers值的变化
get /servers watch
#另一台执行 set /servers hainiu
set /servers hainiu
#监控/servers/tmp 值的变化
stat /servers/tmp watch
#另一台执行 rmr /servers/tmp
rmr /servers/tmp
4.7 集群的启停和数据恢复
#集群的启停命令
1)启动ZK服务: sh bin/zkServer.sh start
2)查看ZK服务状态: sh bin/zkServer.sh status
3)停止ZK服务: sh bin/zkServer.sh stop
4)重启ZK服务: sh bin/zkServer.sh restart
集群启动以后集群可以恢复之前的数据并且可以自动恢复主从节点的信息
那么新的问题来了,集群是如何进行数据恢复和节点启动二次选举的呢???
4.7.1 集群启动的数据加载
集群在重启的时候数据是如何保证内存的数据加载恢复的呢
在zookeeper的数据文件夹中会保存zookeeper的集群运行时候的数据
数据会以日志和快照的形式存储到磁盘中,在启动的时候可以加载进来
4.7.2 节点损坏后的集群leader选举
首先是集群损坏后的集群选举问题
#查看节点中的状态并且关闭节点然后重新选举
ssh_all_zk.sh /usr/local/zookeeper/bin/zkServer.sh status
#发现zookeeper第二台是主节点
zkServer.sh stop
#执行停止节点命令,继续查看各个节点状态
发现第三台机器已经重新选举为leader
所以我们看到在整个集群中,apochId是一致的,zxid也是一致的,那么myid越大则权重越高
4.7.3 集群重新选举完毕的数据恢复
首先我们要提到zab协议,这个协议才是保证集群稳定和对外提供服务的根本
Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)
恢复模式:
当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,恢复模式不接受客户端请求,当领导者被选举出来,且过半的Follower完成了和leader的状态同步以后,恢复模式就结束了。剩下未同步完成的机器会继续同步,直到同步完成并加入集群后该节点的服务才可用。
失败恢复的两种情况
1.leader发送完毕提议就宕机了
2.leader接受完毕ack,但是没有commit提交就宕机了
zab协议的条件
1.所有commit的提议必须被执行
2.没有被commit的执行的必须全部丢掉
选举leader的要求
1.首先是zxid最大的,可以防止数据丢失
2.这个leader不能存在未完成的提议
leader选举完毕后的动作
1.扫清障碍,保证所有的提议都被执行了
2.将落后的follower进行数据同步,并且加入到follower的队伍中
4.7.4 集群的配置节点【补充】
众所周知,集群中的zookeeper需要超过半数,整个集群对外才可用。
这里所谓的整个集群对外才可用,是指整个集群还能选出一个Leader来,zookeeper默认采用
quorums来支持Leader的选举。
其实quorums机制有两个作用:
1.可以保证集群中选举出leader,且是唯一的一个,不会出现脑裂(split-brain)。
2.当客户端更新数据时,当大多数节点更新成功,客户端就会被通知更新成功了,
3. 其他节点可以稍后再更新,以致达到数据的最终一致性
就是说如果有2个zookeeper,那么只要有1个死了zookeeper就不能用了,因为1没有过半,所以2个zookeeper的死亡容忍度为0;同理,要是有3个zookeeper,一个死了,还剩下2个正常的,过半了,所以3个zookeeper的容忍度为1;同理你多列举几个:2->0;3->1;4->1;5->2;6->2会发现一个规律,2n和2n-1的容忍度是一样的,都是n-1,所以为了更加高效,和节省资源,2n和2n-1作用一样只需2n-1即可,不必增加那一个不必要的zookeeper。
5.zookeeper的简单api实现
5.1环境准备
pom文件的依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
log4j的配置
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
5.2 api操作
创建连接
val zk = new ZooKeeper("nn1.hadoop:2181",5000,new Watcher {
override def process(watchedEvent: WatchedEvent): Unit = {}
})
节点操作
//创建节点
zk.create("/class_45","xiaobo_gande".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)
//删除节点
zk.delete("/class_45",-1)
//设置值
zk.setData("/class_45","wangsai".getBytes(),-1)
//获取值
val data = zk.getData("/class_45",false,new Stat())
println(new String(data))
for(i<-1 to 10){
//创建子节点
zk.create(s"/class_45/child_${i}",s"littlesai_${i}".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)
}
//获取子节点
val children: util.List[String] = zk.getChildren("/class_45",false)
import scala.collection.JavaConversions._
for(c<-children){
val data = zk.getData(s"/class_45/${c}",false,new Stat())
println(new String(data))
}
6.zookeeper的使用场景
6.1 zookeeper的特点
最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的特性;
可靠性:具有简单、健壮、良好的性能,如果消息被某一台服务器接受,那么它将被所有的服务器接受;
实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。 但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口;
等待无关(wait-free):慢的或者失效的client,不得干预快速的client的请求,使得每个client都能有效的等待;
原子性:更新只能成功或者失败,没有中间状态;
顺序性:按照客户端发送请求的顺序更新数据;
6.2 zookeeper的使用场景
6.2.1 集群管理
每个加入集群的机器都创建一个节点,写入自己的状态。监控父节点的用户会收到通知,进行相应的处理。离开时session失效,节点删除,监控父节点的用户同样会收到通知。
6.2.2 数据发布与订阅
发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。
应用配置集中到znode上,应用启动时主动获取,并在znode上注册一个watcher,每次配置更新都会通知到应用。
6.2.3 分布式锁
Zookeeper能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。
锁的两种体现方式:
1)独占锁(多人抢椅子坐,谁坐在椅子上,谁就获取锁;离开椅子释放锁)
一个用户创建一个znode作为锁,另一个用户检测该znode,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个znode,代表拥有一个锁。
2)时序锁(看谁先创建,谁先创建,谁就获取锁)
有一个znode作为父节点,其底下是带有编号的子节点,所有要获取锁的用户,需要在父节点下创建带有编号的子节点,编号最小的会持有锁;当最小编号的节点被删除后,锁被释放,再重新找最小编号的节点来持有锁,这样保证了全局有序。
7. zookeeper的使用实现
7.1 服务注册代码场景实现
监控服务端
public class MonitorServer {
private static final String path = "/servers";
private static ZooKeeper zk = null;
//创建zk连接
public static void init() throws IOException {
zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
monitorServers();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) throws Exception {
init();
monitorServers();
Thread.sleep(Long.MAX_VALUE);
}
public static void monitorServers() throws Exception {
List<String> childs = zk.getChildren(path, true);
for (String child : childs) {
byte[] data = zk.getData(path + "/" + child, false, null);
System.out.println("当前在线服务器有:"+new String(data));
}
}
}
监控客户端
public class RegisterClient {
private static ZooKeeper zk =null;
private static final String path = "/servers";
private static final String name = "s3";
public static void init() throws IOException {
zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
public static void main(String[] args) throws Exception {
init();
String path = zk.create(RegisterClient.path + "/" + name, name.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(path+"注册成功!!!");
Thread.sleep(Long.MAX_VALUE);
}
}
7.2独享锁的实现机制
public class RegisterClient {
private static ZooKeeper zk =null;
private static final String path = "/servers";
private static final String name = "lock";
public static void init() throws IOException {
zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
if(watchedEvent.getType().equals(Event.EventType.NodeDeleted))
regist();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static void regist() throws KeeperException, InterruptedException {
try {
String lock = zk.create(path + "/" + name, "nn1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("竞争到独享锁,做自己的事情!!!");
}catch (KeeperException.NodeExistsException e){
zk.exists(path+"/"+name,true);
System.out.println("没有竞争到锁,等待锁!!!!");
}catch (Exception e){
System.out.println("独享锁出错");
e.printStackTrace();
System.exit(1);
}
}
public static void main(String[] args) throws Exception {
init();
regist();
Thread.sleep(Long.MAX_VALUE);
}
}
7.3 共享锁的代码实现
public class DistributeShareLock {
private static ZooKeeper zk = null;
private String self_lock = null;
private static final String lock_path = "/servers/lock";
private static final String lock_prefix = "/servers";
public void init() throws IOException {
zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getLock();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public void work() throws InterruptedException {
System.out.println("当前获取到共享锁!!!做自己的事情!!!");
Thread.sleep(10000);
System.exit(1);
}
public void init_mylock() throws KeeperException, InterruptedException {
self_lock = zk.create(lock_path, "my_lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void getLock() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(lock_prefix, false);
if(children.size() == 1){
//如果只有一个那么肯定是自己
work();
}else if(children.size() == 0){
System.out.println("锁出现异常!!!");
System.exit(1);
}else{
Collections.sort(children);
String my_lock_path = self_lock.substring(lock_prefix.length()+1);
int index = children.indexOf(my_lock_path);
if(index == 0){
//自己是最小的
work();
}else{
zk.exists(lock_prefix+"/"+children.get(index-1),true);
//如果不是最小的那么监控比自己小的
}
}
}
public static void main(String[] args) throws Exception {
DistributeShareLock l1 = new DistributeShareLock();
l1.init();
l1.init_mylock();
l1.getLock();
Thread.sleep(Long.MAX_VALUE);
}
}