2.zookeeper 命令和编程

教程 DER ⋅ 于 2023-05-09 15:57:55 ⋅ 1845 阅读

5.zookeeper的使用命令

3.1 ZooKeeper服务命令

1)启动ZK服务: sh bin/zkServer.sh start
2)查看ZK服务状态: sh bin/zkServer.sh status
3)停止ZK服务: sh bin/zkServer.sh stop
4)重启ZK服务: sh bin/zkServer.sh restart


#查看每个机器ZK运行的状态
./ssh_all_zookeeper.sh /usr/local/zookeeper/bin/zkServer.sh status

#整体停止服务
./ssh_all_zookeeper.sh /usr/local/zookeeper/bin/zkServer.sh stop 

3.2 zk客户端命令

  • 1. 显示根节点下的子节点: ls / 
  • 2. 显示根目录下的子节点和说明信息: ls2 
  • 3. 创建znode,并设置初始内容

                创建永久节点:create /zk "节点内容" 

                创建临时节点:create -e /zk/app "节点内容"

                创建时序节点:create -s /zk/app/s1 "节点内容"

  • 4. 获取节点内容: get /zk 
  • 5. 修改节点内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置
  • 6. 删除节点(此种方式该节点下不能有子节点): delete /zk 
  • 7. 删除节点和所有子节点:rmr /zk
  • 7. 退出客户端: quit
  • 8. 帮助命令: help
#启动zkclient,并连接zookeeper集群
/usr/local/zookeeper/bin/zkCli.sh -server nn1.hadoop:2181,nn2.hadoop:2181,s1.hadoop:2181

nn1机器客户端登录

cZxid:创建节点时的事务id

pZxid:子节点列表最后一次被修改的事务id

cversion:节点版本号

dataCersion:数据版本号

aclVerson:acl权限版本号

如何查看是临时节点还是永久节点?

        当get 节点信息时,其中有一个字段是ephemeralOwner意思是这个节点的临时拥有者。

        当ephemeralOwner 值不为0时,表明这个节点是临时节点,值为会话id。

        当ephemeralOwner 值为0时,表明这个节点是永久节点。.

[zk: nn1.hadoop:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: nn1.hadoop:2181(CONNECTED) 1] ls2 /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: nn1.hadoop:2181(CONNECTED) 2] get /

cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

# 创建永久节点
[zk: nn1.hadoop:2181(CONNECTED) 3] create /hainiu "hainiu bigdata"
Created /hainiu
[zk: nn1.hadoop:2181(CONNECTED) 4] ls /
[hainiu, zookeeper]
[zk: nn1.hadoop:2181(CONNECTED) 5] ls /hainiu
[]
[zk: nn1.hadoop:2181(CONNECTED) 6] get /hainiu
hainiu bigdata
cZxid = 0x300000002
ctime = Sat Jul 17 16:04:50 CST 2021
mZxid = 0x300000002
mtime = Sat Jul 17 16:04:50 CST 2021
pZxid = 0x300000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0 # 为0:永久, 非0:临时
dataLength = 14
numChildren = 0
[zk: nn1.hadoop:2181(CONNECTED) 7] set /hainiu "hainiu bigdata base"
cZxid = 0x300000002
ctime = Sat Jul 17 16:04:50 CST 2021
mZxid = 0x300000003
mtime = Sat Jul 17 16:06:05 CST 2021
pZxid = 0x300000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 19
numChildren = 0
[zk: nn1.hadoop:2181(CONNECTED) 8] get /hainiu
hainiu bigdata base
cZxid = 0x300000002
ctime = Sat Jul 17 16:04:50 CST 2021
mZxid = 0x300000003
mtime = Sat Jul 17 16:06:05 CST 2021
pZxid = 0x300000002
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 19
numChildren = 0
[zk: nn1.hadoop:2181(CONNECTED) 9] ls /
[hainiu, zookeeper]
# 创建临时节点
[zk: nn1.hadoop:2181(CONNECTED) 10] create -e /hainiu/c33 "class33"
Created /hainiu/c33

# 在nn2上查看
[zk: nn2.hadoop(CONNECTED) 3] get /hainiu/c33
class33
cZxid = 0x300000005
ctime = Sat Jul 17 16:08:37 CST 2021
mZxid = 0x300000005
mtime = Sat Jul 17 16:08:37 CST 2021
pZxid = 0x300000005
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17ab3790f740000 # 临时节点
dataLength = 7
numChildren = 0

# 把nn1 的session关掉
[zk: nn1.hadoop:2181(CONNECTED) 11] quit
# 退出后,发现 /hainiu/c33 节点消失了

# 测试时序节点
[zk: nn1.hadoop:2181(CONNECTED) 0] ls /
[hainiu, zookeeper]
[zk: nn1.hadoop:2181(CONNECTED) 1] ls /hainiu
[]
[zk: nn1.hadoop:2181(CONNECTED) 2] create -s /hainiu/class "c1" 
Created /hainiu/class0000000001
[zk: nn1.hadoop:2181(CONNECTED) 3] create -s /hainiu/class "c2"
Created /hainiu/class0000000002
[zk: nn1.hadoop:2181(CONNECTED) 4] create -s /hainiu/class "c3"
Created /hainiu/class0000000003
[zk: nn1.hadoop:2181(CONNECTED) 5] create -s /hainiu/class "c4"
Created /hainiu/class0000000004
[zk: nn1.hadoop:2181(CONNECTED) 6] ls /hainiu
[class0000000003, class0000000002, class0000000001, class0000000004]
[zk: nn1.hadoop:2181(CONNECTED) 7] get /hainiu/class000000000

class0000000003   class0000000002   class0000000001   class0000000004
[zk: nn1.hadoop:2181(CONNECTED) 7] get /hainiu/class0000000001
c1
cZxid = 0x300000008
ctime = Sat Jul 17 16:10:57 CST 2021
mZxid = 0x300000008
mtime = Sat Jul 17 16:10:57 CST 2021
pZxid = 0x300000008
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
[zk: nn1.hadoop:2181(CONNECTED) 8] 

# 测试删除
[zk: nn1.hadoop:2181(CONNECTED) 8] ls /hainiu
[class0000000003, class0000000002, class0000000001, class0000000004]
[zk: nn1.hadoop:2181(CONNECTED) 9] create /app "app"
Created /app
[zk: nn1.hadoop:2181(CONNECTED) 10] ls /
[app, hainiu, zookeeper]
[zk: nn1.hadoop:2181(CONNECTED) 11] delete /app
[zk: nn1.hadoop:2181(CONNECTED) 12] ls /
[hainiu, zookeeper]
[zk: nn1.hadoop:2181(CONNECTED) 13] 
[zk: nn1.hadoop:2181(CONNECTED) 13] delete /hainiu
Node not empty: /hainiu
[zk: nn1.hadoop:2181(CONNECTED) 14] rmr /hainiu
[zk: nn1.hadoop:2181(CONNECTED) 15] ls /
[zookeeper]

3.3 zookeeper的读写数据流程

1)写操作

写入数据的指定leader写入

file

写入数据请求follower的情况

file

2)读操作

1)在Client向Follwer 或 Observer 发出一个读的请求;

2)Follwer 或 Observer 把请求结果返回给Client;

6.zookeeper的使用场景

6.1 zookeeper的特点

最终一致性:client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的特性;

可靠性:具有简单、健壮、良好的性能,如果消息被某一台服务器接受,那么它将被所有的服务器接受;

实时性:Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。 但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口;

等待无关(wait-free):慢的或者失效的client,不得干预快速的client的请求,使得每个client都能有效的等待;

原子性:更新只能成功或者失败,没有中间状态;

顺序性:按照客户端发送请求的顺序更新数据;

6.2 zookeeper的使用场景

6.2.1 集群管理

    每个加入集群的机器都创建一个节点,写入自己的状态。监控父节点的用户会收到通知,进行相应的处理。离开时session失效,节点删除,监控父节点的用户同样会收到通知。

file

6.2.2 数据发布与订阅

        发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。

        应用配置集中到znode上,应用启动时主动获取,并在znode上注册一个watcher,每次配置更新都会通知到应用。

file

6.2.3 分布式锁

        Zookeeper能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。

        锁的两种体现方式:

        1)独占锁(多人抢椅子坐,谁坐在椅子上,谁就获取锁;离开椅子释放锁)

        一个用户创建一个znode作为锁,另一个用户检测该znode,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个znode,代表拥有一个锁。

file

  2)时序锁(看谁先创建,谁先创建,谁就获取锁)

        有一个znode作为父节点,其底下是带有编号的子节点,所有要获取锁的用户,需要在父节点下创建带有编号的子节点,编号最小的会持有锁;当最小编号的节点被删除后,锁被释放,再重新找最小编号的节点来持有锁,这样保证了全局有序。

file

7. zookeeper的简单api和使用实现

7.1 环境搭建

pom文件的依赖

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

log4j的配置

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n  

7.2简单api的实现

创建连接

 val zk = new ZooKeeper("nn1.hadoop:2181",5000,new Watcher {
      override def process(watchedEvent: WatchedEvent): Unit = {}
    })

节点操作

//创建节点 
zk.create("/class_45","xiaobo_gande".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)
//删除节点
    zk.delete("/class_45",-1)
  //设置值
    zk.setData("/class_45","wangsai".getBytes(),-1)
    //获取值
    val data = zk.getData("/class_45",false,new Stat())
    println(new String(data))
     for(i<-1 to 10){
    //创建子节点   
  zk.create(s"/class_45/child_${i}",s"littlesai_${i}".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT)
     }
//获取子节点
val children: util.List[String] = zk.getChildren("/class_45",false)
    import scala.collection.JavaConversions._
    for(c<-children){
      val data =  zk.getData(s"/class_45/${c}",false,new Stat())
      println(new String(data))
    }

7.3 服务注册代码场景实现

监控服务端

public class MonitorServer {
    private static final String path = "/servers";
    private static ZooKeeper zk = null;
    //创建zk连接
    public static void init() throws IOException {
        zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    monitorServers();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    public static void main(String[] args) throws Exception {
        init();
        monitorServers();
        Thread.sleep(Long.MAX_VALUE);
    }
    public static void monitorServers() throws Exception {
        List<String> childs = zk.getChildren(path, true);
        for (String child : childs) {
            byte[] data = zk.getData(path + "/" + child, false, null);
            System.out.println("当前在线服务器有:"+new String(data));
        }
    }
}

监控客户端

public class RegisterClient {
    private static ZooKeeper zk =null;
    private static final String path = "/servers";
    private static final String name = "s3";
    public static void init() throws IOException {
        zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
    public static void main(String[] args) throws Exception {
        init();
        String path = zk.create(RegisterClient.path + "/" + name, name.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println(path+"注册成功!!!");
        Thread.sleep(Long.MAX_VALUE);
    }
}

7.4独享锁的实现机制

public class RegisterClient {
    private static ZooKeeper zk =null;
    private static final String path = "/servers";
    private static final String name = "lock";
    public static void init() throws IOException {
        zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    if(watchedEvent.getType().equals(Event.EventType.NodeDeleted))
                        regist();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void regist() throws KeeperException, InterruptedException {
        try {
            String lock = zk.create(path + "/" + name, "nn1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("竞争到独享锁,做自己的事情!!!");
        }catch (KeeperException.NodeExistsException e){
            zk.exists(path+"/"+name,true);
            System.out.println("没有竞争到锁,等待锁!!!!");
        }catch (Exception e){
            System.out.println("独享锁出错");
            e.printStackTrace();
            System.exit(1);
        }
    }

    public static void main(String[] args) throws Exception {
        init();
        regist();
        Thread.sleep(Long.MAX_VALUE);
    }
}

7.5 共享锁的代码实现

public class DistributeShareLock {
    private static ZooKeeper zk = null;
    private String self_lock = null;
    private static final String lock_path = "/servers/lock";
    private static final String lock_prefix = "/servers";

    public void init() throws IOException {
        zk = new ZooKeeper("nn1.hadoop:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    getLock();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void work() throws InterruptedException {
        System.out.println("当前获取到共享锁!!!做自己的事情!!!");
        Thread.sleep(10000);
        System.exit(1);
    }

    public void init_mylock() throws KeeperException, InterruptedException {
        self_lock = zk.create(lock_path, "my_lock".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public void getLock() throws KeeperException, InterruptedException {
        List<String> children = zk.getChildren(lock_prefix, false);
        if(children.size() == 1){
            //如果只有一个那么肯定是自己
            work();
        }else if(children.size() == 0){
            System.out.println("锁出现异常!!!");
            System.exit(1);
        }else{
            Collections.sort(children);
            String my_lock_path = self_lock.substring(lock_prefix.length()+1);
            int index = children.indexOf(my_lock_path);
            if(index == 0){
                //自己是最小的
                work();
            }else{
                zk.exists(lock_prefix+"/"+children.get(index-1),true);
                //如果不是最小的那么监控比自己小的
            }
        }
    }

    public static void main(String[] args) throws Exception {
        DistributeShareLock l1 = new DistributeShareLock();
        l1.init();
        l1.init_mylock();
        l1.getLock();
        Thread.sleep(Long.MAX_VALUE);
    }
}

8 ZAB协议和数据恢复

8.1 paxos协议

file

paxos协议分为三个阶段

  1. 提议者发送提议到各个接受者部分,接受者给出反馈
  2. 真正提出操作提议,接受者接受并且产生动作
  3. 将所有的提议结果发送给不同的learner

paxos算法的缺陷问题

file

如果server1和server5共同竞争server3作为提议的决定者,那么server3会造成两个提议端的摇摆不定的问题

8.2 ZAB协议的出现

  Zookeeper的核心是原子广播,这个机制保证了各个Server之间的数据一致性。实现这个机制的协议叫做Zab协议。

        Zab协议有两种模式,它们分别是恢复模式(选主)和广播模式(同步)。

        恢复模式:

        当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,恢复模式不接受客户端请求,当领导者被选举出来,且过半的Follower完成了和leader的状态同步以后,恢复模式就结束了。剩下未同步完成的机器会继续同步,直到同步完成并加入集群后该节点的服务才可用。

file

失败恢复的两种情况

1.leader发送完毕提议就宕机了

2.leader接受完毕ack,但是没有commit提交就宕机了

zab协议的条件

1.所有commit的提议必须被执行

2.没有被commit的执行的必须全部丢掉

选举leader的要求

1.首先是zxid最大的,可以防止数据丢失

2.这个leader不能存在未完成的提议

leader选举完毕后的动作

1.扫清障碍,保证所有的提议都被执行了

2.将落后的follower进行数据同步,并且加入到follower的队伍中

        广播模式:

        当集群中已经有过半的 Follower 服务器完成了和 Leader 服务器的状态同步,那么整个服务框架就可以进人消息广播模式了。这时候当一个Server加入ZooKeeper服务中,它会在恢复模式下启动,发现Leader,并和Leader进行状态同步。待到同步结束,它也参与消息广播。ZooKeeper的广播状态一直到Leader崩溃了或者Leader失去了大部分的Followers支持。

8.3 为什么zookeeper最好是奇数台?

        首先需要明确zookeeper选举的规则:leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。

        1)防止在脑裂的情况下集群不可用

                 脑裂:集群的脑裂通常是发生通信不可达的在节点之间情况下,集群会分裂成不同的小集群,小集群各自选出自己的master节点,导致原有的集群出现多个master节点的情况,这就是脑裂。

                 集群有奇数台机器,由于网络原因导致脑裂,一端机器多,一端机器少,多数端机器数量 > 集群总机器数/2,集群可用。

                 集群有4台机器,由于网络原因导致脑裂,有可能出现两端的机器数一样,两端都不 > 集群总机器数/2,集群不可用。                    

        2)在容错能力相同的情况下,奇数台更节省资源

                leader选举,要求 可用节点数量 > 总节点数量/2 。注意 是 > , 不是 ≥。

                举两个例子:

                (1) 假如zookeeper集群1,有3个节点,3/2=1.5 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要2个节点是正常的。换句话说,3个节点的zookeeper集群,允许有一个节点宕机。

                (2) 假如zookeeper集群2,有4个节点,4/2=2 , 即zookeeper想要正常对外提供服务(即leader选举成功),至少需要3个节点是正常的。换句话说,4个节点的zookeeper集群,也允许有一个节点宕机。

                那么问题就来了, 集群1与集群2都有 允许1个节点宕机 的容错能力,但是集群2比集群1多了1个节点。在相同容错能力的情况下,本着节约资源的原则,zookeeper集群的节点数维持奇数个更好一些。  

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-DER,http://hainiubl.com/topics/76312
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter