kubernetes – cluster-autoscaler(CA)实现CloudProvider分析
cluster-autoscaler是node级伸缩的一个服务,需要作为Deployment单实例部署。
它做2个申请:
- 如果集群有pod因为资源不足而pending,那么它会通过云平台实现的cloud provider插件,申请新建node加入集群。
- 如果从云平台申请的node的request使用率太低并且其他node的资源足够容纳这些POD,则会驱逐这些POD并等待一段时间,之后通过cloud provider插件向云平台申请删除node。
大家可以通过云厂商的文档了解这个功能的具体表现:
启动参数
作为一个云厂商,需要关注如下参数:
- –cloud-config:直接透传给cloud provider的参数,插件自己去对应路径加载自己的配置。
- –cloud-provider:传对应云厂商的cloud provider标识,这个在集成插件时已联编到CA中。
- –nodes:可以传多次,每一个表示一个nodeGroup(伸缩组),云厂商的伸缩组一般就是(最小node个数,最大node个数,再附加一下云厂商侧对这个伸缩组的标识),这个字符串格式任意,只要cloud provider能理解就可以。
- –node-group-auto-discovery:相比较于–node写死伸缩组,这个选项可以配合cloud provider的hook实现自动发现云平台侧配置的伸缩组,免得重启CA来添加–nodes,一般云平台不会做这个。
编译方法
下载代码:https://github.com/kubernetes/autoscaler。
需要把代码下载到${GOPATH}/src/k8s.io/autoscaler/目录下面。
然后进入${GOPATH}/src/k8s.io/autoscaler/cluster-autoscaler,执行make即可得到二进制。
开发思路
首先定义cloud标识,cloudprovider/cloud_provider.go,已经有了这些云厂商:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
const ( // AzureProviderName gets the provider name of azure AzureProviderName = "azure" // AlicloudProviderName gets the provider name of alicloud AlicloudProviderName = "alicloud" // AwsProviderName gets the provider name of aws AwsProviderName = "aws" // BaiducloudProviderName gets the provider name of baiducloud BaiducloudProviderName = "baiducloud" // DigitalOceanProviderName gets the provider name of digitalocean DigitalOceanProviderName = "digitalocean" // GceProviderName gets the provider name of gce GceProviderName = "gce" // MagnumProviderName gets the provider name of magnum MagnumProviderName = "magnum" // KubemarkProviderName gets the provider name of kubemark KubemarkProviderName = "kubemark" ) |
我们自己加一个即可,一会要用。
然后要模仿其他厂商,实现2套标准接口:
- CloudProvider:云厂商插件,主要从云平台侧同步N个NodeGroup(伸缩组,一个机器池子,里面的机器配置一样并且数量有限)的最新状态(比如已经申请了几台,还剩几台)。
-
NodeGroup:具体的1个NodeGroup,表示了云平台侧的一个伸缩组(机器池子),主要用于向云平台对应的伸缩组发起申请Node,删除Node。
以baiducloud的实现为例,摸索一下每个方法的作用。
CloudProvider接口
创建过程
这是创建Baidu Cloud Provider的方法,它返回CloudProvider的具体实现:
1 |
func BuildBaiducloud(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider |
opts是已经解析好的命令行参数,我们关心里面的CloudConfig,也就是访问百度云API的配置文件。
do是关于NodeGroup伸缩组的配置,它可能是–nodes配置的静态伸缩组信息,也可能是auto-discovery自动发现云平台伸缩组:
1 2 3 4 5 6 7 |
// NodeGroupDiscoveryOptions contains various options to configure how a cloud provider discovers node groups type NodeGroupDiscoveryOptions struct { // NodeGroupSpecs is specified to statically discover node groups listed in it NodeGroupSpecs []string // NodeGroupAutoDiscoverySpec is specified for automatically discovering node groups according to the specs NodeGroupAutoDiscoverySpecs []string } |
从源码看,百度云没有支持自动发现机制。
这个创建方法把N个–node传入的字符串进行解析,其格式是:
1 |
minNodes:maxNodes:asgName |
插件解析这些字符串,创建了N个NodeGroup接口的具体实现,也就是N个伸缩组,或者说N个机器池子。
每个伸缩组有node的弹性个数范围,还有伸缩组的唯一名字,这些信息在百度云平台控制台都是配置好的。
百度云采用asg这个单词的意思实际就是auto scale group,也就是伸缩组,NodeGroup。
Name
返回插件的名字即可。
1 2 |
// Name returns name of the cloud provider. func (baiducloud *baiducloudCloudProvider) Name() string |
NodeGroups
1 2 |
// NodeGroups returns all node groups configured for this cloud provider. func (baiducloud *baiducloudCloudProvider) NodeGroups() []cloudprovider.NodeGroup |
把之前解析创建的N个NodeGroup对象返回即可,CA会根据需要使用所需的NodeGroup。
NodeGroupForNode
1 2 3 4 |
// NodeGroupForNode returns the node group for the given node, nil if the node // should not be processed by cluster autoscaler, or non-nil error if such // occurred. Must be implemented. NodeGroupForNode(*apiv1.Node) (NodeGroup, error) |
传入一个K8S的Node对象,返回它所属的伸缩组(NodeGroup),不属于就返回nil。
Pricing
1 2 3 4 5 |
// Pricing returns pricing model for this cloud provider or error if not available. // Implementation optional. func (baiducloud *baiducloudCloudProvider) Pricing() (cloudprovider.PricingModel, errors.AutoscalerError) { return nil, cloudprovider.ErrNotImplemented } |
返回一个计费的接口实现,没什么意义,百度云返回err不支持。
GetAvailableMachineTypes
1 2 3 4 5 |
// GetAvailableMachineTypes get all machine types that can be requested from the cloud provider. // Implementation optional. func (baiducloud *baiducloudCloudProvider) GetAvailableMachineTypes() ([]string, error) { return []string{}, cloudprovider.ErrNotImplemented } |
查询云支持的机型,没啥用,百度云返回err不支持。
NewNodeGroup
1 2 3 4 5 6 7 |
// NewNodeGroup builds a theoretical node group based on the node definition provided. The node group is not automatically // created on the cloud provider side. The node group is not returned by NodeGroups() until it is created. // Implementation optional. func (baiducloud *baiducloudCloudProvider) NewNodeGroup(machineType string, labels map[string]string, systemLabels map[string]string, taints []apiv1.Taint, extraResources map[string]resource.Quantity) (cloudprovider.NodeGroup, error) { return nil, cloudprovider.ErrNotImplemented } |
看起来是自动创建新的伸缩组,显然百度云也不支持,需要就自己去百度云控制后台点鼠标新建。
GetResourceLimiter
1 2 3 4 |
// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.). func (baiducloud *baiducloudCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { return baiducloud.resourceLimiter, nil } |
直接返回resoureceLimiter,所有的云平台实现都这样,在创建Cloud Provider时候CA传入的,我们原样返回即可。
这个结构体里面保存的信息全部来自命令行启动参数cores-total,memory-total,似乎代表了伸缩组可以申请的CPU/MEM的范围,一般命令行都不传这些参数,所以等于没有限制,上述代码照抄一下即可。
GPULabel
1 2 3 4 5 6 7 8 |
// GPULabel returns the label added to nodes with GPU resource. func (baiducloud *baiducloudCloudProvider) GPULabel() string { return GPULabel } const ( // GPULabel is the label added to nodes with GPU resource. GPULabel = "baidu/nvidia_name" ) |
对GPU节点的支持,伸缩组给的node如果有这个标签,那么就代表Node是一个GPU机型。
GetAvailableGPUTypes
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// GetAvailableGPUTypes returns all available GPU types cloud provider supports. func (baiducloud *baiducloudCloudProvider) GetAvailableGPUTypes() map[string]struct{} { return availableGPUTypes } var ( availableGPUTypes = map[string]struct{}{ "nTeslaV100": {}, "nTeslaP40": {}, "nTeslaP4": {}, "nTeslaV100-16": {}, "nTeslaV100-32": {}, } ) |
返回云平台支持的GPU类型。
Cleanup
1 2 3 4 |
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc. func (baiducloud *baiducloudCloudProvider) Cleanup() error { return nil } |
CA退出前清理动作,百度云啥也没做。
Refresh
1 2 3 4 5 |
// Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). func (baiducloud *baiducloudCloudProvider) Refresh() error { return nil } |
CA会周期性调用Refresh方法,我们可以在这里请求云平台更新NodeGroup伸缩组的一些信息,当然也可以啥也不做,比如百度云就没做。
因为在百度的实现中,既没有nodeGroup自动发现,因此静态化配置的nodeGroup信息也没啥需要刷新的。
NodeGroup接口
最简单的情况,我们只会在云平台侧配置1个nodeGroup,CA配置这个nodeGroup从而可以从中操作Node的新建和删除。
因此,具体到一个NodeGroup接口的实现来说,我们重点就是实现伸缩组使用情况的获取,以及如何申请和删除Node。
MaxSize
1 2 3 4 |
// MaxSize returns maximum size of the node group. func (asg *Asg) MaxSize() int { return asg.maxSize } |
Asg是百度云实现的NodeGroup类型,MaxSize返回这个伸缩组最多能够分配多少个Node,这个值是–nodes传进来的,在云平台侧其实也有配置,是相对应的。
MinSize
1 2 3 4 |
// MinSize returns minimum size of the node group. func (asg *Asg) MinSize() int { return asg.minSize } |
类似上个方法,是最少要从伸缩组索取的node个数。
TargetSize
1 2 3 4 5 6 7 8 9 10 11 12 |
// TargetSize returns the current target size of the node group. It is possible that the // number of nodes in Kubernetes is different at the moment but should be equal // to Size() once everything stabilizes (new nodes finish startup and registration or // removed nodes are deleted completely). Implementation required. func (asg *Asg) TargetSize() (int, error) { size, err := asg.baiducloudManager.GetAsgSize(asg) return int(size), err } ... ... req, err := bce.NewRequest("GET", c.GetURL("/v1/cluster/group/instances", params), nil) |
该方法就是调用云平台侧,获取当前到底申请了几个Node(虚拟机),其实这个数字可能大于K8S中注册的Node,因为虚拟机新建到加入到K8S集群是需要一段时间的。
IncreaseSize
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// IncreaseSize increases the size of the node group. To delete a node you need // to explicitly name it and use DeleteNode. This function should wait until // node group size is updated. Implementation required. func (asg *Asg) IncreaseSize(delta int) error { if delta <= 0 { return fmt.Errorf("size increase must be positive") } size, err := asg.baiducloudManager.GetAsgSize(asg) if err != nil { return err } if int(size)+delta > asg.MaxSize() { return fmt.Errorf("size increase too large - desired:%d max:%d", int(size)+delta, asg.MaxSize()) } return asg.baiducloudManager.ScaleUpCluster(delta, asg.Name) } ... ... req, err := bce.NewRequest("POST", c.GetURL("v1/cluster/group/scaling_up", params), nil) |
该方法负责新建delta台ndoe,百度的实现是先调用云API获取伸缩组已经分配的Node数量,然后计算加上delta之后是否超过伸缩组的Node个数上限,没超过就调用云API申请新建N个node。
注意,云平台API不仅要新建虚拟机,还要把虚拟机的K8S组件拉起来,并且注册到K8S集群中,这些不是CA做的工作,而是百度云这种平台的工作。
DeleteNodes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
// DeleteNodes deletes nodes from this node group. Error is returned either on // failure or if the given node doesn't belong to this node group. This function // should wait until node group size is updated. Implementation required. func (asg *Asg) DeleteNodes(nodes []*apiv1.Node) error { size, err := asg.baiducloudManager.GetAsgSize(asg) if err != nil { return err } if int(size) <= asg.MinSize() { return fmt.Errorf("min size reached, nodes will not be deleted") } nodeID := make([]string, len(nodes)) for _, node := range nodes { klog.Infof("Delete node : %s", node.Spec.ProviderID) splitted := strings.Split(node.Spec.ProviderID, "//") if len(splitted) != 2 { return fmt.Errorf("Not expected name: %s\n", node.Spec.ProviderID) } belong, err := asg.Belongs(splitted[1]) if err != nil { klog.Errorf("failed to check whether node:%s is belong to asg:%s", node.GetName(), asg.Id()) return err } if !belong { return fmt.Errorf("%s belongs to a different asg than %s", node.Name, asg.Id()) } // todo: if the node exists. nodeID = append(nodeID, splitted[1]) } return asg.baiducloudManager.ScaleDownCluster(nodeID) } ... req, err := bce.NewRequest("POST", c.GetURL("v1/cluster", params), bytes.NewBuffer(postContent)) |
CA驱逐POD后的Node,将调用该方法归还给伸缩组,也就是删除虚拟机。
传入N个要删除的K8S Node对象,调用百度云API查看当前伸缩组申请了几个Node,如果扣除这次要删除的Node个数后,小于了Node个数下限,那么不做删除。
否则循环每个Node,从K8S Node对象中找到ProviderID属性,从中解析出云平台侧的伸缩组ID和NODE ID。
最后,调用百度云API,删除该伸缩组内的对应该虚拟机。
DecreaseTargetSize
1 2 3 4 5 6 7 8 |
// DecreaseTargetSize decreases the target size of the node group. This function // doesn't permit to delete any existing node and can be used only to reduce the // request for new nodes that have not been yet fulfilled. Delta should be negative. // It is assumed that cloud provider will not delete the existing nodes when there // is an option to just decrease the target. Implementation required. func (asg *Asg) DecreaseTargetSize(delta int) error { return cloudprovider.ErrNotImplemented } |
让云平台自己删除delta个已分配的Node,并且要还求不能删除上面运行着POD的那些Node。
百度云没有实现它,因为是可选的。
Id
1 2 3 4 |
// Id returns an unique identifier of the node group. func (asg *Asg) Id() string { return asg.Name } |
返回伸缩组ID,其实是–node传进来的asgName,也对应到云平台侧。
Debug
1 2 3 4 |
// Debug returns a string containing all information regarding this node group. func (asg *Asg) Debug() string { return fmt.Sprintf("%s (%d:%d)", asg.Id(), asg.MinSize(), asg.MaxSize()) } |
把nodeGroup的信息描述一下,一般就是伸缩Node区间,伸缩组ID。
Nodes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
// Nodes returns a list of all nodes that belong to this node group. // It is required that Instance objects returned by this method have Id field set. // Other fields are optional. func (asg *Asg) Nodes() ([]cloudprovider.Instance, error) { asgNodes, err := asg.baiducloudManager.GetAsgNodes(asg) if err != nil { return nil, err } instances := make([]cloudprovider.Instance, len(asgNodes)) for i, asgNode := range asgNodes { instances[i] = cloudprovider.Instance{Id: asgNode} } return instances, nil } / Instance represents a cloud-provider node. The node does not necessarily map to k8s node // i.e it does not have to be registered in k8s cluster despite being returned by NodeGroup.Nodes() // method. Also it is sane to have Instance object for nodes which are being created or deleted. type Instance struct { // Id is instance id. Id string // Status represents status of node. (Optional) Status *InstanceStatus } |
返回伸缩组内的Node列表,逻辑就是拿着伸缩组ID,调用云API获取Node列表,每个Node需要有云平台侧的唯一虚拟机ID标识。
TemplateNodeInfo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
// TemplateNodeInfo returns a schedulerframework.NodeInfo structure of an empty // (as if just started) node. This will be used in scale-up simulations to // predict what would a new node look like if a node group was expanded. The returned // NodeInfo is expected to have a fully populated Node object, with all of the labels, // capacity and allocatable information as well as all pods that are started on // the node by default, using manifest (most likely only kube-proxy). Implementation optional. func (asg *Asg) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { template, err := asg.baiducloudManager.getAsgTemplate(asg.Name) if err != nil { return nil, err } node, err := asg.baiducloudManager.buildNodeFromTemplate(asg, template) if err != nil { return nil, err } nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(asg.Name)) nodeInfo.SetNode(node) return nodeInfo, nil } |
让我们返回一个NodeInfo对象,描述一个新建的Node应该拥有什么样的状况。
没详细研究这个方法对CA调度有什么好处,既然是可选的,我们可以返回err不支持即可。(百度实现了,但是里面填充的一堆东西挺复杂的)
Exist
1 2 3 4 5 |
// Exist checks if the node group really exists on the cloud provider side. Allows to tell the // theoretical node group from the real one. Implementation required. func (asg *Asg) Exist() bool { return true } |
检查该伸缩组是否真实存在于云平台,因为我们是–nodes静态配置的,那一定存在,连百度云API也不用调了。
Create
1 2 3 4 |
// Create creates the node group on the cloud provider side. Implementation optional. func (asg *Asg) Create() (cloudprovider.NodeGroup, error) { return nil, cloudprovider.ErrAlreadyExist } |
再创建一个伸缩组?不用实现。
Delete
1 2 3 4 5 6 |
// Delete deletes the node group on the cloud provider side. // This will be executed only for autoprovisioned node groups, once their size drops to 0. // Implementation optional. func (asg *Asg) Delete() error { return cloudprovider.ErrNotImplemented } |
删除该伸缩组,只有开启自动新建伸缩组的插件才有需要,我们都是在云平台控制台手动创建然后–nodes配死的,所以CA不会调用这个方法。
Autoprovisioned
1 2 3 4 5 |
// Autoprovisioned returns true if the node group is autoprovisioned. An autoprovisioned group // was created by CA and can be deleted when scaled to 0. func (asg *Asg) Autoprovisioned() bool { return false } |
如果CA以及Provider支持自动新建伸缩组(当现有的伸缩组机器耗尽),那么这个方法是有用的。
这个方法告诉CA,这个伸缩组是不是通过自动创建机制分配的。
显然,我们都是–nodes静态分配,返回false表示不是自动创建的NodeGroup即可。
如果文章帮助您解决了工作难题,您可以帮我点击屏幕上的任意广告,或者赞助少量费用来支持我的持续创作,谢谢~
