knowledge-base

我的知识库 / Kubernetes / Kubernetes 定制开发 50:扩展调度器

Kubernetes 定制开发 50:扩展调度器

简介

Kubernetes Scheduler(调度器)是一个控制面进程,负责将 Pods 指派到节点上。调度器基于约束和可用资源为调度队列中每个 Pod 确定其可合法放置的节点。调度器之后对所有合法的节点进行排序,将 Pod 绑定到一个合适的节点。

kube-scheduler 是 Kubernetes 自带的一个默认调度器,它会根据 Pod 的资源需求和节点的资源容量,将 Pod 调度到合适的节点上。

如果默认调度器不符合你的需求,你可以实现自己的调度器,并且你的调度器可以和默认调度器或其他调度器一起运行在集群中。你可以通过声明 Pod 的 spec.schedulerName 字段来指定要使用的调度器。

扩展调度器

有三种方式可以实现自定义调度器:

编译定制调度器镜像

克隆 kubernetes 源码,然后修改 kube-scheduler 源码,然后编译成定制的调度器镜像。

git clone https://github.com/kubernetes/kubernetes.git
cd kubernetes
# 修改源码
make

编写 Dockerfile:

FROM alpine

ADD ./_output/local/bin/linux/amd64/kube-scheduler /usr/local/bin/kube-scheduler

编译并推送镜像:

docker build -t poneding/my-kube-scheduler:v1.0 .
docker push poneding/my-kube-scheduler:v1.0

编写部署清单文件:

deploy-maniest.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-scheduler
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-scheduler-as-kube-scheduler
subjects:
- kind: ServiceAccount
  name: my-scheduler
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: system:kube-scheduler
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-scheduler-config
  namespace: kube-system
data:
  my-scheduler-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
      - schedulerName: my-scheduler
    leaderElection:
      leaderElect: false    
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-scheduler-as-volume-scheduler
subjects:
- kind: ServiceAccount
  name: my-scheduler
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: system:volume-scheduler
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: scheduler
    tier: control-plane
  name: my-scheduler
  namespace: kube-system
spec:
  selector:
    matchLabels:
      component: scheduler
      tier: control-plane
  replicas: 1
  template:
    metadata:
      labels:
        component: scheduler
        tier: control-plane
    spec:
      serviceAccountName: my-scheduler
      containers:
      - command:
        - /usr/local/bin/kube-scheduler
        - --config=/etc/kubernetes/my-scheduler/my-scheduler-config.yaml
        image: poneding/my-kube-scheduler:v1.0
        livenessProbe:
          httpGet:
            path: /healthz
            port: 10259
            scheme: HTTPS
          initialDelaySeconds: 15
        name: my-scheduler
        readinessProbe:
          httpGet:
            path: /healthz
            port: 10259
            scheme: HTTPS
        resources:
          requests:
            cpu: '0.1'
        securityContext:
          privileged: false
        volumeMounts:
          - name: config-volume
            mountPath: /etc/kubernetes/my-scheduler
      hostNetwork: false
      hostPID: false
      volumes:
        - name: config-volume
          configMap:
            name: my-scheduler-config

部署:

kubectl apply -f deploy-maniest.yaml

测试:

kubectl run nginx-by-my-scheduler --image=nginx --overrides='{"spec":{"schedulerName":"my-scheduler"}}'

kubectl get pod -o wide -w

如果一切正常,将观察到 Pod 将会被正常调度到节点上。

使用这种方式来扩展调度器,对开发者来说,需要了解调度器的源码然后修改逻辑,有一定的难度。

自定义调度控制器

基于 controller-runtime 包编写一个调度控制器,原理是通过协调 Pod ,选择一个适合的节点,创建 Binding 对象,将 Pod 绑定到指定的节点上。

创建项目:

mkdir my-scheduler && cd my-scheduler
go mod init my-scheduler
touch main.go

编写 main.go 调度器逻辑(本质是一个 Pod 的协调控制器):

package main

import (
    "context"
    "log"
    "math/rand"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/runtime"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
    "sigs.k8s.io/controller-runtime/pkg/event"
    "sigs.k8s.io/controller-runtime/pkg/manager"
    "sigs.k8s.io/controller-runtime/pkg/predicate"
)

func main() {
    mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), manager.Options{})
    if err != nil {
        log.Fatalf("new manager err: %s", err.Error())
    }

    err = (&MyScheduler{
        Client: mgr.GetClient(),
        Scheme: mgr.GetScheme(),
    }).SetupWithManager(mgr)
    if err != nil {
        log.Fatalf("setup scheduler err: %s", err.Error())
    }

    err = mgr.Start(context.Background())
    if err != nil {
        log.Fatalf("start manager err: %s", err.Error())
    }
}

const mySchedulerName = "my-scheduler"

type MyScheduler struct {
    Client client.Client
    Scheme *runtime.Scheme
}

func (s *MyScheduler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    nodes := new(corev1.NodeList)
    err := s.Client.List(ctx, nodes)
    if err != nil {
        return ctrl.Result{Requeue: true}, err
    }

    // 随机选择一个节点
    targetNode := nodes.Items[rand.Intn(len(nodes.Items))].Name

    // 创建绑定关系
    binding := new(corev1.Binding)
    binding.Name = req.Name
    binding.Namespace = req.Namespace
    binding.Target = corev1.ObjectReference{
        Kind:       "Node",
        APIVersion: "v1",
        Name:       targetNode,
    }

    err = s.Client.Create(ctx, binding)
    if err != nil {
        return ctrl.Result{Requeue: true}, err
    }

    return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (s *MyScheduler) SetupWithManager(mgr ctrl.Manager) error {
    // 过滤目标 Pod
    filter := predicate.Funcs{
        CreateFunc: func(e event.CreateEvent) bool {
            pod, ok := e.Object.(*corev1.Pod)
            if ok {
                return pod.Spec.SchedulerName == mySchedulerName && pod.Spec.NodeName == ""
            }
            return false
        },
        UpdateFunc: func(e event.UpdateEvent) bool {
            return false
        },
        DeleteFunc: func(e event.DeleteEvent) bool {
            return false
        },
    }
    return ctrl.NewControllerManagedBy(mgr).
        For(&corev1.Pod{}).
        WithEventFilter(filter).
        Complete(s)
}

运行自定义调度器:

go run main.go

也可以参考前面的部署方式,先制作一个镜像,然后部署到集群中。

运行一个 Pod,指定调度器为 my-scheduler

kubectl run nginx-by-my-scheduler --image=nginx --overrides='{"spec":{"schedulerName":"my-scheduler"}}'

一切正常的话,将会观察到 Pod 被正常调度到节点上。

Scheduler Extender

通过 Scheduler Extender 来扩展 Kubernetes 调度器,它将以 Webhook 的形式运行,并且在调度器框架阶段中进行干扰。

阶段 描述
Filter 调度框架将调用过滤函数,过滤掉不适合被调度的节点。
Priority 调度框架将调用优先级函数,为每个节点计算一个优先级,优先级越高,节点越适合被调度。
Bind 调度框架将调用绑定函数,将 Pod 绑定到一个节点上。

Scheduler Extender 通过 HTTP 请求的方式,将调度框架阶段中的调度决策委托给外部的调度器,然后将调度结果返回给调度框架。我们只需要实现一个 HTTP 服务,然后将其注册到调度器中,就可以实现自定义调度器。在这个 HTTP 服务中,我们可以实现上述阶段中的任意一个或多个阶段的接口,来定制我们的调度需求。

接口列表:

Filter 接口

接口方法:POST

接口请求参数:

type ExtenderArgs struct {
    Pod *v1.Pod
    Nodes *v1.NodeList
    NodeNames *[]string
}

接口请求结果:

type ExtenderFilterResult struct {
    Nodes *v1.NodeList
    NodeNames *[]string
    FailedNodes FailedNodesMap
    FailedAndUnresolvableNodes FailedNodesMap
    Error string
}

Priority 接口

接口方法:POST

接口请求参数:和 Filter 接口请求参数一致。

接口请求结果:

type HostPriorityList []HostPriority

type HostPriority struct {
    Host string
    Score int64
}

Bind 接口

接口方法:POST

接口请求参数:

type ExtenderBindingArgs struct {
    PodName string
    PodNamespace string
    PodUID types.UID
    Node string
}

接口请求结果:

type ExtenderBindingResult struct {
    Error string
}

实现

我们使用 Scheduler Extender 的方式来实现自定义调度器,供参考。

main.go

package main

import (
    "encoding/json"
    "log"
    "net/http"

    extenderv1 "k8s.io/kube-scheduler/extender/v1"
)

func Filter(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    var args extenderv1.ExtenderArgs
    var result *extenderv1.ExtenderFilterResult

    err := json.NewDecoder(r.Body).Decode(&args)
    if err != nil {
        result = &extenderv1.ExtenderFilterResult{
            Error: err.Error(),
        }
    } else {
        result = filter(args)
    }

    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    if err := json.NewEncoder(w).Encode(result); err != nil {
        log.Printf("failed to encode result: %v", err)
    }
}

func Prioritize(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    var args extenderv1.ExtenderArgs
    var result *extenderv1.HostPriorityList

    err := json.NewDecoder(r.Body).Decode(&args)
    if err != nil {
        result = &extenderv1.HostPriorityList{}
    } else {
        result = prioritize(args)
    }

    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    if err := json.NewEncoder(w).Encode(result); err != nil {
        log.Printf("failed to encode result: %v", err)
    }
}

func Bind(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    var args extenderv1.ExtenderBindingArgs
    var result *extenderv1.ExtenderBindingResult

    err := json.NewDecoder(r.Body).Decode(&args)
    if err != nil {
        result = &extenderv1.ExtenderBindingResult{
            Error: err.Error(),
        }
    } else {
        result = bind(args)
    }

    w.Header().Set("Content-Type", "application/json")
    w.WriteHeader(http.StatusOK)
    if err := json.NewEncoder(w).Encode(result); err != nil {
        log.Printf("failed to encode result: %v", err)
    }
}

func main() {
    http.HandleFunc("/filter", Filter)
    http.HandleFunc("/priority", Prioritize)
    http.HandleFunc("/bind", Bind)
    http.ListenAndServe(":8080", nil)
}

filter.go:没有具体实现节点过滤逻辑,直接返回所有节点。

package main

import (
    "log"

    extenderv1 "k8s.io/kube-scheduler/extender/v1"
)

func filter(args extenderv1.ExtenderArgs) *extenderv1.ExtenderFilterResult {
    log.Println("my-scheduler-extender filter called.")

    return &extenderv1.ExtenderFilterResult{
        Nodes:     args.Nodes,
        NodeNames: args.NodeNames,
    }
}

prioritize.go:模拟打分,按照节点顺序给节点累加一个分数。

package main

import (
    "log"

    extenderv1 "k8s.io/kube-scheduler/extender/v1"
)

func prioritize(args extenderv1.ExtenderArgs) *extenderv1.HostPriorityList {
    log.Println("my-scheduler-extender prioritize called.")

    var result extenderv1.HostPriorityList
    for i, node := range args.Nodes.Items {
        result = append(result, extenderv1.HostPriority{
            Host:  node.Name,
            Score: int64(i),
        })
    }

    return &result
}

bind.go:没有具体实现绑定逻辑,直接返回成功。

package main

import (
    "context"
    "log"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/rest"
    extenderv1 "k8s.io/kube-scheduler/extender/v1"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"
)

var kconfig *rest.Config
var kruntimeclient client.Client

func init() {
    kconfig = ctrl.GetConfigOrDie()
    var err error
    kruntimeclient, err = client.New(kconfig, client.Options{
        Scheme: scheme.Scheme,
    })
    if err != nil {
        log.Fatalf("failed to create k8s runtime client: %v", err)
    }
}

func bind(args extenderv1.ExtenderBindingArgs) *extenderv1.ExtenderBindingResult {
    log.Println("my-scheduler-extender bind called.")
    log.Printf("pod %s/%s is bind to %s", args.PodNamespace, args.PodName, args.Node)

    // 创建绑定关系
    binding := new(corev1.Binding)
    binding.Name = args.PodName
    binding.Namespace = args.PodNamespace
    binding.Target = corev1.ObjectReference{
        Kind:       "Node",
        APIVersion: "v1",
        Name:       args.Node,
    }

    result := new(extenderv1.ExtenderBindingResult)

    err := kruntimeclient.Create(context.Background(), binding)
    if err != nil {
        result.Error = err.Error()
    }

    return result
}

编译成二进制文件:

GOOS=linux GOARCH=amd64 go build -o my-scheduler-extender

编写 Dockerfile:

FROM alpine
ARG TARGETOS TARGETARCH
ADD ./bin/$TARGETOS/$TARGETARCH/my-scheduler-extender /my-scheduler-extender
ENTRYPOINT ["/my-scheduler-extender"]

编译并推送镜像:

GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o bin/linux/amd64/my-scheduler-extender
GOOS=linux GOARCH=arm64 CGO_ENABLED=0 go build -o bin/linux/arm64/my-scheduler-extender

docker buildx build --push --platform linux/amd64,linux/arm64 -t poneding/my-kube-scheduler-extender:v1.0 .

编写部署清单文件,部署清单中包括额外的调度器(参考上述编译定制调度器镜像的方式)和我们开发的 Scheduler Extender:

注意:为了简化部署清单,给了 my-scheduler-extender 和 my-scheduler-with-extender 容器 cluster-admin 权限,实际上不需要这么高的权限。

deploy-manifests.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-scheduler-with-extender
  namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-scheduler-with-extender
subjects:
- kind: ServiceAccount
  name: my-scheduler-with-extender
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-scheduler-with-extender-config
  namespace: kube-system
data:
  my-scheduler-with-extender-config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1
    kind: KubeSchedulerConfiguration
    profiles:
      - schedulerName: my-scheduler-with-extender
    leaderElection:
      leaderElect: false
    extenders:
    - urlPrefix: "http://my-scheduler-extender.kube-system.svc:8080"
      enableHTTPS: false
      filterVerb: "filter"
      prioritizeVerb: "prioritize"
      bindVerb: "bind"
      weight: 1
      nodeCacheCapable: false
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: my-scheduler-with-extender
    tier: control-plane
  name: my-scheduler-with-extender
  namespace: kube-system
spec:
  selector:
    matchLabels:
      component: my-scheduler-with-extender
      tier: control-plane
  replicas: 1
  template:
    metadata:
      labels:
        component: my-scheduler-with-extender
        tier: control-plane
    spec:
      serviceAccountName: my-scheduler-with-extender
      containers:
      - command:
        - kube-scheduler
        - --config=/etc/kubernetes/my-scheduler-with-extender/my-scheduler-with-extender-config.yaml
        image: registry.k8s.io/kube-scheduler:v1.29.0
        livenessProbe:
          httpGet:
            path: /healthz
            port: 10259
            scheme: HTTPS
          initialDelaySeconds: 15
        name: my-scheduler-with-extender
        readinessProbe:
          httpGet:
            path: /healthz
            port: 10259
            scheme: HTTPS
        resources:
          requests:
            cpu: '0.1'
        securityContext:
          privileged: false
        volumeMounts:
          - name: config-volume
            mountPath: /etc/kubernetes/my-scheduler-with-extender
      hostNetwork: false
      hostPID: false
      volumes:
        - name: config-volume
          configMap:
            name: my-scheduler-with-extender-config
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: my-scheduler-extender
    tier: control-plane
  name: my-scheduler-extender
  namespace: kube-system
spec:
  selector:
    matchLabels:
      component: my-scheduler-extender
      tier: control-plane
  replicas: 1
  template:
    metadata:
      labels:
        component: my-scheduler-extender
        tier: control-plane
    spec:
      serviceAccountName: my-scheduler-with-extender
      containers:
      - image: poneding/my-kube-scheduler-extender:v1.0
        name: my-scheduler-extender
        imagePullPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
  name: my-scheduler-extender
  namespace: kube-system
spec:
  selector:
    component: my-scheduler-extender
    tier: control-plane
  ports:
  - port: 8080
    targetPort: 8080

部署:

kubectl apply -f deploy-manifests.yaml

运行一个测试 Pod,查看 my-scheduler-extender 容器的日志:

kubectl run nginx-by-my-scheduler-extender --image=nginx --overrides='{"spec":{"schedulerName":"my-scheduler-with-extender"}}'

# 查看 my-scheduler-extender 日志
kubectl logs deploy/my-scheduler-extender -n kube-system -f

代码传送门:my-scheduler-extender

参考


« Kubernetes 定制开发 02:CRD

» 简单介绍 K8s