etcd v3服务注册与服务发现
使用etcd最常见的场景就是服务注册与服务发现,现在微服务流行网关架构:
- 提供具体service的节点称为agent。
- gateway充当agent集群的代理节点,客户端访问gateway间接得到agent的数据。
- 同一类service在etcd中注册在相同目录下。
- gateway代理若干不同的service,需要监听etcd中多个目录的内容变化。
我在demo中编写了2个程序,一个agent,一个gateway。
可以启动多个agent实时的上线下线,gateway可以很快的感知到他们的变化。
agent
agent程序启动后,需要在对应service目录下创建一个代表自己的节点。
因为agent会异常退出,所以这个节点必须支持TTL自动过期。
属于同一个service的多个agent会注册到同一个目录下。
key的组装规则如下:
/serviceName/leaseId -> endpoint
第一级是service的名称,第二级是lease ID,每个节点首先申请lease租约,然后拼装成一个整key进行Put,value就是agent自己的服务地址(比如ip:port)。
当agent工作正常时,应该定时的刷新租约,避免key过期。考虑到各种极端场景,极有可能在Put或者Keepalive时Lease已经过期,所以需要进行容错处理,分配新的Lease进行重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
func Register(dir string, value string) { dir = strings.TrimRight(dir, "/") + "/" client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { os.Exit(1) } kv := clientv3.NewKV(client) lease := clientv3.NewLease(client) var curLeaseId clientv3.LeaseID = 0 for { if curLeaseId == 0 { leaseResp, err := lease.Grant(context.TODO(), 10) if err != nil { goto SLEEP } key := dir + fmt.Sprintf("%d", leaseResp.ID) if _, err := kv.Put(context.TODO(), key, value, clientv3.WithLease(leaseResp.ID)); err != nil { goto SLEEP } curLeaseId = leaseResp.ID } else { fmt.Printf("keepalive curLeaseId=%d\n", curLeaseId) if _, err := lease.KeepAliveOnce(context.TODO(), curLeaseId); err == rpctypes.ErrLeaseNotFound { curLeaseId = 0 continue } } SLEEP: time.Sleep(time.Duration(1) * time.Second) } } func main() { go Register("/agent", "192.168.1.2:80") |
相关的API在之前的博客里都涉及过,整个register函数就是一个死循环,每隔1秒进行一次lease续约,若发生异常则重建lease。
gateway
网关作为代理,需要实时获取到service的agent列表,所以需要watch监听service对应的目录。
etcd v3的watch机制在最早的博客中有过详细说明,整个etcd中发生过的所有revision在bbolt中顺序存储,因此我们watch事件时需要告知etcd从哪个revision版本开始追踪。
我们要做的,首先是获取一次当前目录下的所有孩子并保存起来,作为起始状态。
那么应该从哪个revision开始监听后续变化呢?从revision=0开始会遍历历史上所有的版本,完全没有意义。一个比较可行的方法是,获取这批孩子中最大的ModRevision(最后修改版本),然后从这个版本+1开始监听,因为后续的revision一定大于这个版本。
实际上,有一个更加简单的方法,就是任何一个KV的操作,在应答里都会返回一个Header,里面记录的是执行这个操作时的etcd全局版本号,相当于GetRevision and GetChildren是一个原子操作。
所以,后续孩子的变化一定是大于response.Header.Revision的,比获取孩子列表最大ModRevision性能更好,因为孩子可能很久没有更新,而其他的Key带来了很多Revision,遍历这些Revision是没有意义的。
先获取所有孩子,得到监听的起始Revision:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// 先读当前所有孩子, 直到成功为止 kv := clientv3.NewKV(client) for { rangeResp, err := kv.Get(context.TODO(), discover.dir, clientv3.WithPrefix()) if err != nil { continue } discover.mutex.Lock() for _, kv := range rangeResp.Kvs { discover.nodes[string(kv.Key)] = string(kv.Value) } discover.mutex.Unlock() // 从当前版本开始订阅 curRevision = rangeResp.Header.Revision + 1 break } |
Header的结构如下,其中Revision表示请求提交时的系统Revision:
1 2 3 4 5 6 7 8 9 10 |
type ResponseHeader struct { // cluster_id is the ID of the cluster which sent the response. ClusterId uint64 `protobuf:"varint,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` // member_id is the ID of the member which sent the response. MemberId uint64 `protobuf:"varint,2,opt,name=member_id,json=memberId,proto3" json:"member_id,omitempty"` // revision is the key-value store revision when the request was applied. Revision int64 `protobuf:"varint,3,opt,name=revision,proto3" json:"revision,omitempty"` // raft_term is the raft term when the request was applied. RaftTerm uint64 `protobuf:"varint,4,opt,name=raft_term,json=raftTerm,proto3" json:"raft_term,omitempty"` } |
然后从这个Revision开始监听后续变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// 监听后续的PUT与DELETE事件 watcher := clientv3.NewWatcher(client) watchChan := watcher.Watch(context.TODO(), discover.dir, clientv3.WithPrefix(), clientv3.WithRev(curRevision)) for watchResp := range watchChan { for _, event := range watchResp.Events { discover.mutex.Lock() switch (event.Type) { case mvccpb.PUT: fmt.Println("PUT事件") discover.nodes[string(event.Kv.Key)] = string(event.Kv.Value) case mvccpb.DELETE: delete(discover.nodes, string(event.Kv.Key)) fmt.Println("DELETE事件") } discover.mutex.Unlock() } } |
NewWatcher如同NewKV一样,得到一个API子集,它的方法如下:
1 2 3 4 5 6 7 8 9 10 |
type Watcher interface { // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. If revisions waiting to be sent over the // watch are compacted, then the watch will be canceled by the server, the // client will post a compacted error watch response, and the channel will close. Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // Close closes the watcher and cancels all watch requests. Close() error } |
一般我们只会用到Watch方法,因为要监听一个目录,所以传递了WithPrefix。
因为要监听大于curRevision的版本,所以传递了withRev:
1 2 3 |
// WithRev specifies the store revision for 'Get' request. // Or the start revision of 'Watch' request. func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } } |
我们通过管道获取etcd发送来的watchResp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
type WatchResponse struct { Header pb.ResponseHeader Events []*Event // CompactRevision is the minimum revision the watcher may receive. CompactRevision int64 // Canceled is used to indicate watch failure. // If the watch failed and the stream was about to close, before the channel is closed, // the channel sends a final response that has Canceled set to true with a non-nil Err(). Canceled bool // Created is used to indicate the creation of the watcher. Created bool closeErr error // cancelReason is a reason of canceling watch cancelReason string } |
主要关注Events数组,里面是若干etcd中顺序发生的revision:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type Event struct { // type is the kind of event. If type is a PUT, it indicates // new data has been stored to the key. If type is a DELETE, // it indicates the key was deleted. Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"` // kv holds the KeyValue for the event. // A PUT event contains current kv pair. // A PUT event with kv.Version=1 indicates the creation of a key. // A DELETE/EXPIRE event contains the deleted key with // its modification revision set to the revision of deletion. Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"` // prev_kv holds the key-value pair before the event happens. PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"` } |
每个Event的类型要么是PUT要么是DELETE。
这里需要关注一下,watchResponse有一个重要字段叫做Canceled,也就是说watcher可能会出错,进而被关闭,至于什么场景下会失败我目前还在测试中。
最后,如何实现选主?
实现公平抢主比较复杂,我建议大家都去抢同一个key。
大概逻辑是:
有个”抢主函数”,它首先申请一个lease ID(全局唯一ID,永不重复),它代表你本次抢主的唯一票据。
然后利用txn事务执行Put if not set,需要携带lease ID确保其会自动过期。
如果设置成功,说明”抢成功”。
如果设置失败,说明”别人抢到了”,接下里进入”观察函数”。
在”观察函数”中,根据”抢环节”得到的最后revision作为起始位置,创建一个watcher。
每次收到DELETE事件,就调用”抢主函数”,一旦”抢成功”就关闭watcher。”抢失败”则继续等待DELETE事件。
上述流程中,一旦”抢主成功”,那么就为lease ID定时续租即可。
在上述所有流程中,一旦遇到err报错,那么就要关闭所有资源,整个函数从头执行,无论当前是不是”主”,即放弃当前身份。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~

watch的时候我不提供revison会怎么样
记不太清楚了,不是从最新就是从最旧,试试便知。