golang操作zookeeper
官方文档
https://pkg.go.dev/github.com/samuel/go-zookeeper@v0.0.0-20201211165307-7117e9ea2414/zk
下载包
go get github.com/samuel/go-zookeeper
连接到Server
package main
import (
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
)
func getConn() *zk.Conn {
var hosts = []string{"192.168.74.138:2181"} //server端ip:host
conn, _, err := zk.Connect(hosts, time.Second*5)
defer conn.Close()
if err != nil {
fmt.Println(err)
return nil
} else {
fmt.Println("zookeeper 连接成功!")
return conn
}
}
func main() {
conn := getConn()
fmt.Println(&conn)
}
运行结果:
[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:10:15 Connected to 192.168.74.128:2181
2022/09/13 13:10:15 authenticated: id=144115412815642627, timeout=4000
2022/09/13 13:10:15 re-submitting `0` credentials after reconnect
0xc000006028
[Done] exited with code=0 in 2.261 seconds
创建节点
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error)
//path:路径
//data:数据
// flags有4中取值:
// 0:永久,除非手动删除
// zk.FlagEphemeral = 1:短暂,session断开则该节点也会被删除
// zk.FlagSequence = 2:会自动还在节点后面添加序号
// 3:Ephemeral和Sequence,即短暂且自动添加序号
//acl:权限
实例演示:
package main
import (
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
)
func getConn() *zk.Conn {
var hosts = []string{"192.168.74.128:2181"} //server端host
conn, _, err := zk.Connect(hosts, time.Second*1)
// defer conn.Close()
if err != nil {
fmt.Println(err)
return nil
} else {
fmt.Println("zookeeper 连接成功!")
return conn
}
}
func create() {
var conn *zk.Conn = getConn()
defer conn.Close()
var path = "/home"
var data = []byte("Psych")
var flags int32 = 0
var acls = zk.WorldACL(zk.PermAll)
// zk.WorldACL() 生成一个 ACL 列表,其中包含一个使用所提供权限的 ACL,方案为“world”,ID 为“anyone”,ZooKeeper 使用它来表示任何用户。
p, err_create := conn.Create(path, data, flags, acls)
if err_create != nil {
fmt.Println(err_create)
return
}
fmt.Println("create:", p)
b, _, err := conn.Get(path)
if err != nil {
fmt.Println(err)
}
fmt.Println("创建节点:", string(b))
}
func main() {
/* conn := getConn()
fmt.Println(&conn) */
create()
}
运行结果:
[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:10:15 Connected to 192.168.74.128:2181
2022/09/13 13:10:15 authenticated: id=144115412815642627, timeout=4000
2022/09/13 13:10:15 re-submitting `0` credentials after reconnect
create: /home
创建节点: Psych
2022/09/13 13:10:15 recv loop terminated: err=EOF
[Done] exited with code=0 in 1.437 seconds
zk.Cli客户端查看:
节点创建成功
修改节点
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
// version:版本号
实例演示:
// 修改节点
func set() {
var conn *zk.Conn = getConn()
defer conn.Close()
var path = "/home"
var data = []byte("Psych-18")
conn.Set(path, data, -1)
b, _, err := conn.Get(path)
if err != nil {
fmt.Println(err)
}
fmt.Println("数据:" + string(b))
}
运行结果:
[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
zookeeper 连接成功!
2022/09/13 13:18:39 Connected to 192.168.74.128:2181
2022/09/13 13:18:39 authenticated: id=144115412815642628, timeout=4000
2022/09/13 13:18:39 re-submitting `0` credentials after reconnect
数据:Psych-18
[Done] exited with code=0 in 1.376 seconds
zk.Cli客户端查看:
节点数据修改成功
删除节点
func (c *Conn) Delete(path string, version int32) error
实例演示:
// 删除节点
func del() {
var conn *zk.Conn = getConn()
defer conn.Close()
var path = "/home"
err := conn.Delete(path, -1)
if err != nil {
fmt.Println("删除失败")
} else {
fmt.Println("删除成功")
}
}
运行结果:
zookeeper 连接成功!
2022/09/13 13:28:37 Connected to 192.168.74.128:2181
2022/09/13 13:28:37 authenticated: id=144115412815642629, timeout=4000
2022/09/13 13:28:37 re-submitting `0` credentials after reconnect
删除成功
zk.Cli客户端查看:
删除成功
删改与增不同在于其函数中的version参数,其中version是用于 CAS支持
可以通过此种方式保证原子性
watch机制
在go-zookeeper中通过Event进行监听
全局监听
调用zk.WithEventCallback(callback)设置回调
package main
import (
"fmt"
"time"
"github.com/samuel/go-zookeeper/zk"
)
// 创建节点
func create(conn *zk.Conn) {
var path = "/home"
var data = []byte("Psych")
var flags int32 = 0
var acls = zk.WorldACL(zk.PermAll)
// zk.WorldACL() 生成一个 ACL 列表,其中包含一个使用所提供权限的 ACL,方案为“world”,ID 为“anyone”,ZooKeeper 使用它来表示任何用户。
p, err_create := conn.Create(path, data, flags, acls)
if err_create != nil {
fmt.Println(err_create)
return
}
fmt.Println("create:", p)
b, _, err := conn.Get(path)
if err != nil {
fmt.Println(err)
}
fmt.Println("创建节点:", string(b))
}
// 修改节点
func set(conn *zk.Conn) {
var path = "/home"
var data = []byte("Psych-18")
conn.Set(path, data, -1)
b, _, err := conn.Get(path)
if err != nil {
fmt.Println(err)
}
fmt.Println("数据:" + string(b))
}
// 删除节点
func del(conn *zk.Conn) {
var path = "/home"
err := conn.Delete(path, -1)
if err != nil {
fmt.Println("删除失败")
} else {
fmt.Println("删除成功")
}
}
// 回调函数
func callback(event zk.Event) {
fmt.Println("---------------------")
fmt.Printf("Path: %v\n", event.Path)
fmt.Printf("Type: %v\n", event.Type.String())
fmt.Printf("State: %v\n", event.State.String())
fmt.Println("---------------------")
}
// 监听
func watch() {
hosts := []string{"192.168.74.128:2181"}
option := zk.WithEventCallback(callback)
// WithEventCallback 返回一个指定事件回调的连接选项。回调不能阻塞 - 这样做会延迟 ZK go 例程。
conn, _, err := zk.Connect(hosts, time.Second*5, option)
// option: 指定条件
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}
var path = "/home"
_, _, _, err = conn.ExistsW(path)
// 判断路径是否存在
if err != nil {
fmt.Println(err)
return
}
// 创建
create(conn)
time.Sleep(time.Second * 2)
_, _, _, err = conn.ExistsW(path)
if err != nil {
fmt.Println(err)
return
}
// 删除
del(conn)
}
func main() {
watch()
}
运行结果:
[Running] go run "e:\golang开发学习\test_zk_golang\test.go"
---------------------
Path:
Type: EventSession
State: StateConnecting
---------------------
---------------------
Path:
Type: EventSession
State: StateConnected
---------------------
2022/09/13 13:53:11 Connected to 192.168.74.128:2181
---------------------
Path:
Type: EventSession
State: StateHasSession
---------------------
2022/09/13 13:53:11 authenticated: id=144115412815642630, timeout=5000
2022/09/13 13:53:11 re-submitting `0` credentials after reconnect
---------------------
Path: /home
Type: EventNodeCreated
State: unknown state
---------------------
create: /home
创建节点: Psych
---------------------
Path: /home
Type: EventNodeDeleted
State: unknown state
---------------------
删除成功
2022/09/13 13:53:13 recv loop terminated: err=EOF
[Done] exited with code=0 in 3.228 seconds
部分监听
调用
conn.ExistsW(path)
或GetW(path)
为对应节点设置监听,该监听只生效一次开启一个协程处理chanel中传来的event事件
注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main
//部分代码如下:
func watchPart() {
hosts := []string{"192.168.74.128:2181"}
conn, _, err := zk.Connect(hosts, time.Second*5)
defer conn.Close()
if err != nil {
fmt.Println(err)
return
}
var path = "/home"
_, _, ech, err := conn.ExistsW(path)
if err != nil {
fmt.Println(err)
return
}
go watchCreataNode(ech)
create(conn)
}
func watchCreataNode(ech <-chan zk.Event) {
event := <-ech
fmt.Println("*******************")
fmt.Println("path:", event.Path)
fmt.Println("type:", event.Type.String())
fmt.Println("state:", event.State.String())
fmt.Println("-------------------")
}
注意:
- 如果即设置了全局监听有设置了部分监听,那么最终是都会触发的,并且全局监听在先执行
- 如果设置了监听子节点,那么事件的触发是先子节点后父节点
版权声明:本文为qq_39280718原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。