etcd v3客户端用法
本文将示例etcd v3客户端的主要API用法,在开始之前建议你先阅读《etcd v3原理分析》,以便对etcd的数据结构和模型有深刻的印象。
准备
服务端
建议是直接下载github上release出来的二进制包,免得自己编译安装:https://github.com/coreos/etcd/releases/。
有兴趣也可以下载代码自己去编译整个项目,这个参考:https://coreos.com/etcd/docs/latest/dl_build.html。
启动一个单点服务端用于测试,只需要执行./etcd即可前台运行,默认服务端口:2379。
客户端
v3版本客户端的主页:https://github.com/coreos/etcd/tree/master/clientv3。
按照标准GO项目的组织结构,你应该先export GOPATH=xxx目录,xxx是所有Golang项目的公共存储目录。
按照常规流程,现在进入xxx目录,执行如下命令即可下载整个clientv3包并完成编译:
1 |
go get github.com/coreos/etcd/clientv3 |
该命令会将包下载到xxx/src/github.com/coreos/etcd/clientv3中,所有相关依赖包会自动下载编译,包括protobuf、grpc等…
此后你可以继续在xxx目录中,执行go get自己的github项目,并在自己的github项目中成功调用到clientv3的API,例如我的项目:
1 |
go get https://github.com/owenliang/go-etcd |
那么就会出现xxx/src/github.com/owenliang/go-etcd目录,在里面编写你的测试代码即可。
可恶的GFW
对于国内朋友们来说,除非你有高速稳定的海外VPN(不是shadowsocks),否则很难成功下载代码,这里面有2个原因:
- google的包都托管在golang.org域名下,完全被墙。
- go get实际是调用git clone下载包源码,但是git不支持socks5代理,比较麻烦。
对于只有shadowsocks的朋友们,下面是一个不错的解决方法,大家可以访问:https://www.golangtc.com/download/package,输入包名github.com/coreos/etcd/clientv3,这个平台会在海外服务器替我们执行go get拿到所有的依赖包以及源代码,分别打包成tar.gz。
我们要做的就是通过shadowsocks代理的浏览器,将这些源代码下载回来,并在xxx/src下解压,这样就相当于完成了go get的第一步(下载源代码)。
类似的,如果你自己有海外VPS,那么直接去海外go get下载所有源码再拷贝回国内即可。
在你准备好了所有包源码后,你需要执行一次go install来编译所有的源代码,会在xxx/pkg下生成各种静态库.a文件,这就算搞定了客户端安装了。
使用
文档
地址:https://godoc.org/github.com/coreos/etcd/clientv3。
乍一看API挺多,实际上这个文档经过梳理后还是很简单的,下面我把主要的API逐个演示一下。
建议你安装一个etcd v3客户端环境,这样对于好奇的数据结构你也可以跳转到源码里去大概了解,有助于你梳理整个脉络。
连接
1 2 3 4 |
cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2378"}, DialTimeout: 5 * time.Second, }) |
要访问etcd第一件事就是创建client,它需要传入一个Config配置,这里传了2个选项:
- Endpoints:etcd的多个节点服务地址,因为我是单点测试,所以只传1个。
- DialTimeout:创建client的首次连接超时,这里传了5秒,如果5秒都没有连接成功就会返回err;值得注意的是,一旦client创建成功,我们就不用再关心后续底层连接的状态了,client内部会重连。
当然,如果上述err != nil,那么一般情况下我们可以选择重试几次,或者退出程序(重启)。
这里重点需要了解一下client到底长什么样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Client struct { Cluster KV Lease Watcher Auth Maintenance // Username is a user name for authentication. Username string // Password is a password for authentication. Password string // contains filtered or unexported fields } |
Cluster、KV、Lease…,你会发现它们其实就代表了整个客户端的几大核心功能板块,分别用于:
- Cluster:向集群里增加etcd服务端节点之类,属于管理员操作。
- KV:我们主要使用的功能,即操作K-V。
- Lease:租约相关操作,比如申请一个TTL=10秒的租约。
- Watcher:观察订阅,从而监听最新的数据变化。
- Auth:管理etcd的用户和权限,属于管理员操作。
- Maintenance:维护etcd,比如主动迁移etcd的leader节点,属于管理员操作。
我们需要使用什么功能,就去获取对应的对象即可。
获取KV对象
实际上client.KV是一个interface,提供了关于k-v操作的所有方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
type KV interface { // Put puts a key-value pair into etcd. // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte{0x10, 0x20}). Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) // Get retrieves keys. // By default, Get will return the value for "key", if any. // When passed WithRange(end), Get will return the keys in the range [key, end). // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) // Compact compacts etcd KV history before the given rev. Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) // Do applies a single Op on KV without a transaction. // Do is useful when creating arbitrary operations to be issued at a // later time; the user can range over the operations, calling Do to // execute them. Get/Put/Delete, on the other hand, are best suited // for when the operation should be issued at the time of declaration. Do(ctx context.Context, op Op) (OpResponse, error) // Txn creates a transaction. Txn(ctx context.Context) Txn } |
但是我们并不是直接获取client.KV来使用,而是通过一个方法来获得一个经过装饰的KV实现(内置错误重试机制的高级KV):
1 |
kv := clientv3.NewKV(cli) |
接下来,我们将通过kv对象操作etcd中的数据。
Put
1 |
putResp, err := kv.Put(context.TODO(),"/test/a", "something") |
第一个参数context经常用golang的同学比较熟悉,很多API利用context实现取消操作,比如希望超过一定时间就让API立即返回,但是通常我们不需要用到。
后面2个参数分别是key和value,还记得etcd是k-v存储引擎么?对于etcd来说,key=/test/a只是一个字符串而已,但是对我们而言却可以模拟出目录层级关系,先继续往下看。
其函数原型如下:
1 2 3 4 5 |
// Put puts a key-value pair into etcd. // Note that key,value can be plain bytes array and string is // an immutable representation of that bytes array. // To get a string of bytes, do string([]byte{0x10, 0x20}). Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) |
除了我们传递的参数,还支持一个可变参数,主要是传递一些控制项来影响Put的行为,例如可以携带一个lease ID来支持key过期,这个后面再说。
上述Put操作返回的是PutResponse,不同的KV操作对应不同的response结构,这里顺便一提。
1 2 3 4 5 6 7 |
type ( CompactResponse pb.CompactionResponse PutResponse pb.PutResponse GetResponse pb.RangeResponse DeleteResponse pb.DeleteRangeResponse TxnResponse pb.TxnResponse ) |
你可以通过IDE跳转到PutResponse,详细看看有哪些可用的信息:
1 2 3 4 5 |
type PutResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` // if prev_kv is set in the request, the previous key-value pair will be returned. PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"` } |
Header里保存的主要是本次更新的revision信息,而PrevKv可以返回Put覆盖之前的value是什么(目前是nil,后面会说原因),打印给大家看看:
1 |
&{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:182 raft_term:3 <nil>} |
记得,我们需要判断err来确定操作是否成功。
我们再Put其他2个key,用于后续演示:
1 2 3 4 5 |
// 再写一个孩子 kv.Put(context.TODO(),"/test/b", "another") // 再写一个同前缀的干扰项 kv.Put(context.TODO(), "/testxxx", "干扰") |
现在理论上来说,/test目录下有2个孩子:a与b,而/testxxx并不是/test目录的孩子。
Get
我们可以先来读取一下/test/a:
1 |
getResp, err := kv.Get(context.TODO(), "/test/a") |
其函数原型如下:
1 2 3 4 5 6 7 8 9 |
// Get retrieves keys. // By default, Get will return the value for "key", if any. // When passed WithRange(end), Get will return the keys in the range [key, end). // When passed WithFromKey(), Get returns keys greater than or equal to key. // When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision; // if the required revision is compacted, the request will fail with ErrCompacted . // When passed WithLimit(limit), the number of returned keys is bounded by limit. // When passed WithSort(), the keys will be sorted. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) |
和Put类似,函数注释里提示我们可以传递一些控制参数来影响Get的行为,比如:WithFromKey表示读取从参数key开始递增的所有key,而不是读取单个key。
在上面的例子中,我没有传递opOption,所以就是获取key=/test/a的最新版本数据。
这里err并不能反馈出key是否存在(只能反馈出本次操作因为各种原因异常了),我们需要通过GetResponse(实际上是pb.RangeResponse)判断key是否存在:
1 2 3 4 5 6 7 8 9 10 |
type RangeResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` // kvs is the list of key-value pairs matched by the range request. // kvs is empty when count is requested. Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"` // more indicates if there are more keys to return in the requested range. More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"` // count is set to the number of keys within the range when requested. Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"` } |
Kvs字段,保存了本次Get查询到的所有k-v对,因为上述例子只Get了一个单key,所以只需要判断一下len(Kvs)是否==1即可知道是否存在。
而mvccpb.KeyValue在上一篇博客中有所提及,它就是etcd在bbolt中保存的K-v对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
type KeyValue struct { // key is the key in bytes. An empty key is not allowed. Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // create_revision is the revision of last creation on this key. CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"` // mod_revision is the revision of last modification on this key. ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"` // version is the version of the key. A deletion resets // the version to zero and any modification of the key // increases its version. Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` // value is the value held by the key, in bytes. Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` // lease is the ID of the lease that attached to key. // When the attached lease expires, the key will be deleted. // If lease is 0, then no lease is attached to the key. Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"` } |
至于RangeResponse.More和Count,当我们使用withLimit()选项进行Get时会发挥作用,相当于翻页查询。
接下来,我们通过一个特别的Get选项,获取/test目录下的所有孩子:
1 |
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix()) |
WithPrefix()是指查找以/test/为前缀的所有key,因此可以模拟出查找子目录的效果。
我们知道etcd是一个有序的k-v存储,因此/test/为前缀的key总是顺序排列在一起。
withPrefix实际上会转化为范围查询,它根据前缀/test/生成了一个key range,[“/test/”, “/test0”),为什么呢?因为比/大的字符是’0’,所以以/test0作为范围的末尾,就可以扫描到所有的/test/打头的key了。
在之前,我Put了一个/testxxx干扰项,因为不符合/test/前缀(注意末尾的/),所以就不会被这次Get获取到。但是,如果我查询的前缀是/test,那么/testxxx也会被扫描到,这就是etcd k-v模型导致的,编程时一定要特别注意。
打印rangeResp.Kvs可以看到2个孩子:
1 |
[key:"/test/a" create_revision:3 mod_revision:182 version:44 value:"something" key:"/test/b" create_revision:10 mod_revision:183 version:38 value:"another" ] |
获取Lease对象
和获取KV对象一样,通过下面代码获取它:
1 |
lease := clientv3.NewLease(cli) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
type Lease interface { // Grant creates a new lease. Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) // Revoke revokes the given lease. Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) // TimeToLive retrieves the lease information of the given lease ID. TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) // Leases retrieves all leases. Leases(ctx context.Context) (*LeaseLeasesResponse, error) // KeepAlive keeps the given lease alive forever. KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive // should be used instead of KeepAliveOnce. KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) // Close releases all resources Lease keeps for efficient communication // with the etcd server. Close() error } |
Lease提供了几个功能:
- Grant:分配一个租约。
- Revoke:释放一个租约。
- TimeToLive:获取剩余TTL时间。
- Leases:列举所有etcd中的租约。
- KeepAlive:自动定时的续约某个租约。
- KeepAliveOnce:为某个租约续约一次。
- Close:貌似是关闭当前客户端建立的所有租约。
Grant与TTL
要想实现key自动过期,首先得创建一个租约,它有10秒的TTL:
1 |
grantResp, err := lease.Grant(context.TODO(), 10) |
grantResp中主要使用到了ID,也就是租约ID:
1 2 3 4 5 6 7 |
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse. type LeaseGrantResponse struct { *pb.ResponseHeader ID LeaseID TTL int64 Error string } |
接下来,我们用这个租约来Put一个会自动过期的Key:
1 |
kv.Put(context.TODO(), "/test/expireme", "gone...", clientv3.WithLease(grantResp.ID)) |
这里特别需要注意,有一种情况是在Put之前Lease已经过期了,那么这个Put操作会返回error,此时你需要重新分配Lease。
当我们实现服务注册时,需要主动给Lease进行续约,这需要调用KeepAlive/KeepAliveOnce,你可以在一个循环中定时的调用:
1 2 |
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID) // sleep一会.. |
keepResp结构如下:
1 2 3 4 5 6 |
// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse. type LeaseKeepAliveResponse struct { *pb.ResponseHeader ID LeaseID TTL int64 } |
没什么特别需要用到的字段。
KeepAlive和Put一样,如果在执行之前Lease就已经过期了,那么需要重新分配Lease。Etcd并没有提供API来实现原子的Put with Lease。
Op
Op字面意思就是”操作”,Get和Put都属于Op,只是为了简化用户开发而开放的特殊API。
实际上,KV有一个Do方法接受一个Op:
1 2 3 4 5 6 |
// Do applies a single Op on KV without a transaction. // Do is useful when creating arbitrary operations to be issued at a // later time; the user can range over the operations, calling Do to // execute them. Get/Put/Delete, on the other hand, are best suited // for when the operation should be issued at the time of declaration. Do(ctx context.Context, op Op) (OpResponse, error) |
其参数Op是一个抽象的操作,可以是Put/Get/Delete…;而OpResponse是一个抽象的结果,可以是PutResponse/GetResponse…
可以通过一些函数来分配Op:
- func OpDelete(key string, opts …OpOption) Op
- func OpGet(key string, opts …OpOption) Op
- func OpPut(key, val string, opts …OpOption) Op
- func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
其实和直接调用KV.Put,KV.GET没什么区别。
下面是一个例子:
1 2 |
op1 := clientv3.OpPut("/hi", "hello", clientv3.WithPrevKV()) opResp, err := kv.Do(context.TODO(), op1) |
这里设置一个key=/hi,value=hello,希望结果中返回覆盖之前的value。
把这个op交给Do方法执行,返回的opResp结构如下:
1 2 3 4 5 6 |
type OpResponse struct { put *PutResponse get *GetResponse del *DeleteResponse txn *TxnResponse } |
你的操作是什么类型,你就用哪个指针来访问对应的结果,仅此而已。
Txn事务
etcd中事务是原子执行的,只支持if … then … else …这种表达,能实现一些有意思的场景。
首先,我们需要开启一个事务,这是通过KV对象的方法实现的:
1 |
txn := kv.Txn(context.TODO()) |
我写了如下的测试代码,Then和Else还比较好理解,If是比较陌生的。
1 2 3 4 |
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=", "hello")). Then(clientv3.OpGet("/hi")). Else(clientv3.OpGet("/test/", clientv3.WithPrefix())). Commit() |
我们先看下Txn支持的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
type Txn interface { // If takes a list of comparison. If all comparisons passed in succeed, // the operations passed into Then() will be executed. Or the operations // passed into Else() will be executed. If(cs ...Cmp) Txn // Then takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() succeed. Then(ops ...Op) Txn // Else takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() fail. Else(ops ...Op) Txn // Commit tries to commit the transaction. Commit() (*TxnResponse, error) } |
Txn必须是这样使用的:If(满足条件) Then(执行若干Op) Else(执行若干Op)。
If中支持传入多个Cmp比较条件,如果所有条件满足,则执行Then中的Op(上一节介绍过Op),否则执行Else中的Op。
在我的例子中只传入了1个比较条件:
1 |
clientv3.Compare(clientv3.Value("/hi"), "=", "hello") |
Value(“/hi”)是指key=/hi对应的value,它是条件表达式的”主语”,类型是Cmp:
1 2 3 |
func Value(key string) Cmp { return Cmp{Key: []byte(key), Target: pb.Compare_VALUE} } |
这个Value(“/hi”)返回的Cmp表达了:”/hi这个key对应的value”。
接下来,利用Compare函数来继续为”主语”增加描述,形成了一个完整条件语句,即”/hi这个key对应的value”必须等于”hello”。
Compare函数实际上是对Value返回的Cmp对象进一步修饰,增加了”=”与”hello”两个描述信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
func Compare(cmp Cmp, result string, v interface{}) Cmp { var r pb.Compare_CompareResult switch result { case "=": r = pb.Compare_EQUAL case "!=": r = pb.Compare_NOT_EQUAL case ">": r = pb.Compare_GREATER case "<": r = pb.Compare_LESS default: panic("Unknown result op") } cmp.Result = r switch cmp.Target { case pb.Compare_VALUE: val, ok := v.(string) if !ok { panic("bad compare value") } cmp.TargetUnion = &pb.Compare_Value{Value: []byte(val)} case pb.Compare_VERSION: cmp.TargetUnion = &pb.Compare_Version{Version: mustInt64(v)} case pb.Compare_CREATE: cmp.TargetUnion = &pb.Compare_CreateRevision{CreateRevision: mustInt64(v)} case pb.Compare_MOD: cmp.TargetUnion = &pb.Compare_ModRevision{ModRevision: mustInt64(v)} case pb.Compare_LEASE: cmp.TargetUnion = &pb.Compare_Lease{Lease: mustInt64orLeaseID(v)} default: panic("Unknown compare type") } return cmp } |
Cmp可以用于描述”key=xxx的yyy属性,必须=、!=、<、>,kkk值”,比如:
- key=xxx的value,必须!=,hello。
- key=xxx的create版本号,必须=,11233。
- key=xxx的lease id,必须=,12319231231238。
经过Compare函数修饰的Cmp对象,内部包含了完整的条件信息,传递给If函数即可。
类似于Value的函数用于指定yyy属性,有这么几个方法:
- func CreateRevision(key string) Cmp:key=xxx的创建版本必须满足…
- func LeaseValue(key string) Cmp:key=xxx的Lease ID必须满足…
- func ModRevision(key string) Cmp:key=xxx的最后修改版本必须满足…
- func Value(key string) Cmp:key=xxx的创建值必须满足…
- func Version(key string) Cmp:key=xxx的累计更新次数必须满足…
最后Commit提交整个Txn事务,我们需要判断txnResp获知If条件是否成立:
1 2 3 4 5 |
if txnResp.Succeeded { // If = true fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs) } else { // If =false fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs) } |
Succeed=true表示If条件成立,接下来我们需要获取Then或者Else中的OpResponse列表(因为可以传多个Op),可以看一下txnResp的结构:
1 2 3 4 5 6 7 8 |
type TxnResponse struct { Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"` // succeeded is set to true if the compare evaluated to true or false otherwise. Succeeded bool `protobuf:"varint,2,opt,name=succeeded,proto3" json:"succeeded,omitempty"` // responses is a list of responses corresponding to the results from applying // success if succeeded is true or failure if succeeded is false. Responses []*ResponseOp `protobuf:"bytes,3,rep,name=responses" json:"responses,omitempty"` } |
每个Op有一个ResponseOp对象,而ResponseOp的结构我们在前面贴过,可以翻回去看看。
最后
etcd v3客户端主要功能就是这些,希望可以帮你理清API脉络,更快的上手编码。
watch因为比较重要,所以会在下一篇博客里单独说明,暂时掌握这些即可。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

go get 跟在哪个目录执行没关系
这里只是描述操作流程,不是描述go get的工作机制。
很好
“每个Op有一个ResponseOp对象,而ResponseOp的结构我们在前面贴过,可以翻回去看看。”
ResponseOp和OpResponse不一样吧
我也记不清了…你自己看看吧..
你好, etcd的事务能够实现两个key同时删除的原子操作吗? 即必须保证都删除,任何一个失败,都还原到删除前?
etcd有事务,关键字叫做txn,但是它的用法是if … then … else … ,总是需要传1个if条件,你可以制造一个总是不满足的条件,然后在else里放置多个Op,例如:
Txn(context.TODO()).If(
Compare(Value(k1), ">", v1),
Compare(Version(k1), "=", 2)
).Then(
OpPut(k2,v2), OpPut(k3,v3)
).Else(
OpPut(k4,v4), OpPut(k5,v5)
).Commit()
dg
31313
clientv3使用gomod各种冲突,解决方案:将包改为 go.etcd.io/etcd/client/v3