golang操作zookeeper


golang操作zookeeper

官方文档

https://pkg.go.dev/github.com/samuel/go-zookeeper@v0.0.0-20201211165307-7117e9ea2414/zk

下载包

go get github.com/samuel/go-zookeeper

连接到Server

package main

import (
	"fmt"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

func getConn() *zk.Conn {
    var hosts = []string{"192.168.74.138:2181"} //server端ip:host
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return nil
	} else {
		fmt.Println("zookeeper 连接成功!")
		return conn
	}

}

func main() {
	conn := getConn()
	fmt.Println(&conn)
}

运行结果:

[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:10:15 Connected to 192.168.74.128:2181
2022/09/13 13:10:15 authenticated: id=144115412815642627, timeout=4000
2022/09/13 13:10:15 re-submitting `0` credentials after reconnect
0xc000006028

[Done] exited with code=0 in 2.261 seconds

创建节点

func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error)
//path:路径
//data:数据
// flags有4中取值:
	// 0:永久,除非手动删除
	// zk.FlagEphemeral = 1:短暂,session断开则该节点也会被删除
	// zk.FlagSequence = 2:会自动还在节点后面添加序号
	// 3:Ephemeral和Sequence,即短暂且自动添加序号
//acl:权限

实例演示:

package main

import (
	"fmt"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

func getConn() *zk.Conn {
	var hosts = []string{"192.168.74.128:2181"} //server端host
	conn, _, err := zk.Connect(hosts, time.Second*1)
	// defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return nil
	} else {
		fmt.Println("zookeeper 连接成功!")
		return conn
	}

}

func create() {
	var conn *zk.Conn = getConn()
	defer conn.Close()

	var path = "/home"
	var data = []byte("Psych")
	var flags int32 = 0

	var acls = zk.WorldACL(zk.PermAll)
	// zk.WorldACL() 生成一个 ACL 列表,其中包含一个使用所提供权限的 ACL,方案为“world”,ID 为“anyone”,ZooKeeper 使用它来表示任何用户。

	p, err_create := conn.Create(path, data, flags, acls)
	if err_create != nil {
		fmt.Println(err_create)
		return
	}
	fmt.Println("create:", p)

	b, _, err := conn.Get(path)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("创建节点:", string(b))

}

func main() {
	/* conn := getConn()
	fmt.Println(&conn) */

	create()

}

运行结果:

[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:10:15 Connected to 192.168.74.128:2181
2022/09/13 13:10:15 authenticated: id=144115412815642627, timeout=4000
2022/09/13 13:10:15 re-submitting `0` credentials after reconnect
create: /home
创建节点: Psych
2022/09/13 13:10:15 recv loop terminated: err=EOF

[Done] exited with code=0 in 1.437 seconds

zk.Cli客户端查看:

在这里插入图片描述

节点创建成功

修改节点

func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
// version:版本号

实例演示:

// 修改节点
func set() {
	var conn *zk.Conn = getConn()
	defer conn.Close()

	var path = "/home"
	var data = []byte("Psych-18")

	conn.Set(path, data, -1)
	b, _, err := conn.Get(path)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("数据:" + string(b))
}

运行结果:

[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:18:39 Connected to 192.168.74.128:2181
2022/09/13 13:18:39 authenticated: id=144115412815642628, timeout=4000
2022/09/13 13:18:39 re-submitting `0` credentials after reconnect
数据:Psych-18

[Done] exited with code=0 in 1.376 seconds

zk.Cli客户端查看:

在这里插入图片描述

节点数据修改成功

删除节点

func (c *Conn) Delete(path string, version int32) error

实例演示:

// 删除节点
func del() {
	var conn *zk.Conn = getConn()
	defer conn.Close()

	var path = "/home"
	err := conn.Delete(path, -1)
	if err != nil {
		fmt.Println("删除失败")
	} else {
		fmt.Println("删除成功")
	}
}

运行结果:

zookeeper 连接成功!
2022/09/13 13:28:37 Connected to 192.168.74.128:2181
2022/09/13 13:28:37 authenticated: id=144115412815642629, timeout=4000
2022/09/13 13:28:37 re-submitting `0` credentials after reconnect
删除成功

zk.Cli客户端查看:

在这里插入图片描述

删除成功

删改不同在于其函数中的version参数,其中version是用于 CAS支持
可以通过此种方式保证原子性

watch机制

在go-zookeeper中通过Event进行监听

全局监听

调用zk.WithEventCallback(callback)设置回调

package main

import (
	"fmt"
	"time"

	"github.com/samuel/go-zookeeper/zk"
)

// 创建节点
func create(conn *zk.Conn) {
	var path = "/home"
	var data = []byte("Psych")
	var flags int32 = 0

	var acls = zk.WorldACL(zk.PermAll)
	// zk.WorldACL() 生成一个 ACL 列表,其中包含一个使用所提供权限的 ACL,方案为“world”,ID 为“anyone”,ZooKeeper 使用它来表示任何用户。

	p, err_create := conn.Create(path, data, flags, acls)
	if err_create != nil {
		fmt.Println(err_create)
		return
	}
	fmt.Println("create:", p)

	b, _, err := conn.Get(path)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("创建节点:", string(b))

}

// 修改节点
func set(conn *zk.Conn) {
	var path = "/home"
	var data = []byte("Psych-18")

	conn.Set(path, data, -1)
	b, _, err := conn.Get(path)

	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("数据:" + string(b))
}

// 删除节点
func del(conn *zk.Conn) {
	var path = "/home"
	err := conn.Delete(path, -1)
	if err != nil {
		fmt.Println("删除失败")
	} else {
		fmt.Println("删除成功")
	}
}

// 回调函数
func callback(event zk.Event) {
	fmt.Println("---------------------")
	fmt.Printf("Path: %v\n", event.Path)
	fmt.Printf("Type: %v\n", event.Type.String())
	fmt.Printf("State: %v\n", event.State.String())
	fmt.Println("---------------------")
}

// 监听
func watch() {
	hosts := []string{"192.168.74.128:2181"}
	option := zk.WithEventCallback(callback)
	// WithEventCallback 返回一个指定事件回调的连接选项。回调不能阻塞 - 这样做会延迟 ZK go 例程。

	conn, _, err := zk.Connect(hosts, time.Second*5, option)
	//	option: 指定条件
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	var path = "/home"
	_, _, _, err = conn.ExistsW(path)
   // 判断路径是否存在
	if err != nil {
		fmt.Println(err)
		return
	}

	// 创建
	create(conn)

	time.Sleep(time.Second * 2)

	_, _, _, err = conn.ExistsW(path)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 删除
	del(conn)

}

func main() {
	watch()
}

运行结果:

[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
---------------------
Path: 
Type: EventSession
State: StateConnecting
---------------------
---------------------
Path: 
Type: EventSession
State: StateConnected
---------------------
2022/09/13 13:53:11 Connected to 192.168.74.128:2181
---------------------
Path: 
Type: EventSession
State: StateHasSession
---------------------
2022/09/13 13:53:11 authenticated: id=144115412815642630, timeout=5000
2022/09/13 13:53:11 re-submitting `0` credentials after reconnect
---------------------
Path: /home
Type: EventNodeCreated
State: unknown state
---------------------
create: /home
创建节点: Psych
---------------------
Path: /home
Type: EventNodeDeleted
State: unknown state
---------------------
删除成功
2022/09/13 13:53:13 recv loop terminated: err=EOF

[Done] exited with code=0 in 3.228 seconds

部分监听

  1. 调用conn.ExistsW(path)GetW(path)为对应节点设置监听,该监听只生效一次

  2. 开启一个协程处理chanel中传来的event事件

    注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main

//部分代码如下:
func watchPart() {
	hosts := []string{"192.168.74.128:2181"}
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	var path = "/home"
	_, _, ech, err := conn.ExistsW(path)
	if err != nil {
		fmt.Println(err)
		return
	}
	go watchCreataNode(ech)
	create(conn)

}

func watchCreataNode(ech <-chan zk.Event) {
	event := <-ech
	fmt.Println("*******************")
	fmt.Println("path:", event.Path)
	fmt.Println("type:", event.Type.String())
	fmt.Println("state:", event.State.String())
	fmt.Println("-------------------")
}

注意:

  1. 如果即设置了全局监听有设置了部分监听,那么最终是都会触发的,并且全局监听在先执行
  2. 如果设置了监听子节点,那么事件的触发是先子节点后父节点

版权声明:本文为qq_39280718原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。