0%

源码面前,了无密码:Kuberentes Scheduler 源码剖析

本篇文章介绍一下 Kubernetes 的默认调度器 kube-scheduler 的源码实现。kubernetes 代码版本:v1.18.4-rc.0。

0. 入口

入口函数在路径 kubernetes/cmd/kube-scheduler/scheduler.go#main(),如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewSchedulerCommand()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
// utilflag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

if err := command.Execute(); err != nil {
os.Exit(1)
}
}

核心逻辑就是:1. 创建一个 SchedulerCommand(第 4 行);2. 接收参数并执行(第 14 行)。我们先看一下创建 SchedulerCommand 的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
opts, err := options.NewOptions()
if err != nil {
klog.Fatalf("unable to initialize command options: %v", err)
}

cmd := &cobra.Command{
Use: "kube-scheduler",
Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/)
for more information about scheduling and the kube-scheduler component.`,
Run: func(cmd *cobra.Command, args []string) {
if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
}
fs := cmd.Flags()

...

return cmd
}

首先我们可以看到 NewSchedulerCommand 接收一个不定参数,registryOptions。从名字我们可以看出来首先这个参数是作用于一个 Registry 的,这个 Registry 实际上就是用来管理 kuberentes 中的 plugin 的。

1
2
3
4
// Registry is a collection of all available plugins. The framework uses a
// registry to enable and initialize configured plugins.
// All plugins must be in the registry before initializing the framework.
type Registry map[string]PluginFactory

而 registryOptions 中的 option 其实是一种函数传参的方式的使用。option 传参的方式最早由 Rob Pike 提出来的,简单来说就是将可选的 option 参数封装成多个函数传给目标函数,然后在目标函数内部通过调用 option 函数的方式来初始化。后面我们看到 RegistryOptions 初始化的部分再来介绍。对于 option 这种方式感兴趣的同学可以参考我之前的一篇文章:http://legendtkl.com/2016/11/05/code-scalability/

其次是 cmd,通过 cobra.Command 构建出来的一个 CLI 处理工具,对于命令行的输入通过第 18 行的匿名函数来处理,匿名函数内部会调用函数 runCommand 来启动 scheduler 进程。去掉一些不重要的代码逻辑,runCommand 主要做的事情就是创建 scheduler 参数,然后通过 Run 函数启动 scheduler 进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
...

// 创建 scheduler 参数
c, err := opts.Config()
if err != nil {
return err
}

// Get the completed config
// 参数补充
cc := c.Complete()

// Configz registration.
if cz, err := configz.New("componentconfig"); err == nil {
cz.Set(cc.ComponentConfig)
} else {
return fmt.Errorf("unable to register configz: %s", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

return Run(ctx, cc, registryOptions...)
}

Run 函数的主要逻辑如下:

  1. 初始化 Registry,第 6 ~ 11 行就是 option 这种函数传参的处理逻辑。
  2. 创建 scheduler 实例
  3. 其他初始化操作,包括 EventBroadcast、健康检测、metric 等相关逻辑
  4. 启动 Pod Informer 来监听 Pod
  5. 运行调度器(分没有启动 leader 选举,但是对应的方法都是 sched.Run 方法)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
// To help debugging, immediately log version
klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

outOfTreeRegistry := make(framework.Registry)
for _, option := range outOfTreeRegistryOptions {
if err := option(outOfTreeRegistry); err != nil {
return err
}
}

recorderFactory := getRecorderFactory(&cc)
// Create the scheduler.
sched, err := scheduler.New(cc.Client,
cc.InformerFactory,
cc.PodInformer,
recorderFactory,
ctx.Done(),
scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
)
if err != nil {
return err
}

// Prepare the event broadcaster.
if cc.Broadcaster != nil && cc.EventClient != nil {
cc.Broadcaster.StartRecordingToSink(ctx.Done())
}
if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
}
// Setup healthz checks.
var checks []healthz.HealthChecker
if cc.ComponentConfig.LeaderElection.LeaderElect {
checks = append(checks, cc.LeaderElection.WatchDog)
}

// Start up the healthz server.
if cc.InsecureServing != nil {
separateMetrics := cc.InsecureMetricsServing != nil
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start healthz server: %v", err)
}
}
if cc.InsecureMetricsServing != nil {
handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
return fmt.Errorf("failed to start metrics server: %v", err)
}
}
if cc.SecureServing != nil {
handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
// TODO: handle stoppedCh returned by c.SecureServing.Serve
if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
// fail early for secure handlers, removing the old error loop from above
return fmt.Errorf("failed to start secure server: %v", err)
}
}

// Start all informers.
go cc.PodInformer.Informer().Run(ctx.Done())
cc.InformerFactory.Start(ctx.Done())

// Wait for all caches to sync before scheduling.
cc.InformerFactory.WaitForCacheSync(ctx.Done())

// If leader election is enabled, runCommand via LeaderElector until done and exit.
if cc.LeaderElection != nil {
cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: sched.Run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
}
leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
if err != nil {
return fmt.Errorf("couldn't create leader elector: %v", err)
}

leaderElector.Run(ctx)

return fmt.Errorf("lost lease")
}

// Leader election is disabled, so runCommand inline until done.
sched.Run(ctx)
return fmt.Errorf("finished without leader elect")
}

scheduler 实例

首先我们看一下 scheduler 的定义,路径为 pkg/scheduler/scheduler.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Scheduler 监听未调度的 Pod,为其寻找适合的 Node 节点,并写回到 api server
type Scheduler struct {
// 调度器 Cache
SchedulerCache internalcache.Cache

Algorithm core.ScheduleAlgorithm
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
// handler so that binding and setting PodCondition it is atomic.
podConditionUpdater podConditionUpdater
// 在抢占情况下用来驱逐 pod,更新抢占者的 'NominatedNode' 字段
podPreemptor podPreemptor

// 返回下一个需要调度的 Pod,如果没有需要调度的 pod,则该方法将 block 住。这里不使用 channel 数据结构是因为调度过程可能会花费一定时间,设计者并不想在这个时间内让 Pod 停留在 channel 中。注:虽然官方没有说,这里还有一个可能的原因是 channel 不能持久化数据。
NextPod func() *framework.PodInfo

// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*framework.PodInfo, error)

// 用一个空的 struct channel 来标识是否需要 stop。Golang 中的惯用用法。
StopEverything <-chan struct{}

// 处理 PVC/PV
VolumeBinder scheduling.SchedulerVolumeBinder

// 是否禁止 Pod 抢占
DisablePreemption bool

// 调度队列,需要调度的 Pod 都存在这个队列里面,内部实现是一个优先级队列
SchedulingQueue internalqueue.SchedulingQueue

// Profiles are the scheduling profiles.
Profiles profile.Map

scheduledPodsHasSynced func() bool
}

运行调度器

下面看一下调度器

1
2
3
4
5
6
7
8
9
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
return
}
sched.SchedulingQueue.Run()
wait.UntilWithContext(ctx, sched.scheduleOne, 0)
sched.SchedulingQueue.Close()
}

在 scheduler 的 Run 函数中主要做了三件事情:

  1. 等待 scheduler cache 同步(scheduler 刚起来,相当于冷启动)
  2. 运行调度器队列的 Run 函数
  3. 运行 scheduler 的 scheduleOne 函数

调度队列

调度队列的 Run 函数第一次看到总是给你一点点疑惑,作为一个队列难道还需要启动吗?确实是这样,如果调度队列只是一个优先级队列,那么确实不需要启动。kubernetes 中的调度队列是由三个队列组成,分别是:

  • activeQueue:待调度的 pod 队列,scheduler 会监听这个队列
  • backoffQueue:在 kubernetes 中,如果调度失败了,就相当于一次 backoff。backoffQueue 专门用来存放 backoff 的 pod。一般会有一个 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之间的时间成倍增长。
  • unschedulableQueue:调度过程被终止的 pod 存放的队列。

调度队列的 Run 函数做的事情就是将 backoffQueue 和 unschedulableQueue 中 pod 定期移动到 activeQueue 中。

1
2
3
4
5
// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

其中 wait.Until 实际上就是一个类似 Cron 的定时调度器。细节实现暂时不细说了。

1
2
3
4
5
6
7
8
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}

我们再来看一下两个 flush 函数的逻辑。首先是 flushBackoffQCompleted() ,主要逻辑如下:

  1. 取出 BackoffQueue 中优先级最高的 Pod,或者说下一次调度时间最近的 Pod。这里感兴趣的同学可以看一下 BackoffQueue 的初始化过程,其中有一个 lessFunc 用来做 compare Operator 的,这个 lessFunc 也是复用了下面代码片段中的 getBackoffTime 函数。
  2. 计算 Pod 的 backoffTime,实际上就是下一次应该调度的时间。
  3. backoffTime 的计算逻辑在函数 calculateBackoffDuration() 中,我们可以看到 backoff duration 是依次递增为上一次的 2 倍,并且有一个上限值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
for {
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
return
}
pod := rawPodInfo.(*framework.PodInfo).Pod
boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
if boTime.After(p.clock.Now()) {
return
}
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
return
}
p.activeQ.Add(rawPodInfo)
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
defer p.cond.Broadcast()
}
}

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
duration := p.calculateBackoffDuration(podInfo)
backoffTime := podInfo.Timestamp.Add(duration)
return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
duration := p.podInitialBackoffDuration
for i := 1; i < podInfo.Attempts; i++ {
duration = duration * 2
if duration > p.podMaxBackoffDuration {
return p.podMaxBackoffDuration
}
}
return duration
}

下面我们看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函数 flushUnschedulableQLeftover 的实现逻辑。逻辑非常简单,如果 pod 在 unschedulableQueue 中停留时间超过了 60s,就会被移除到 activeQueue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
// to activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
p.lock.Lock()
defer p.lock.Unlock()

var podsToMove []*framework.PodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulableQ.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
podsToMove = append(podsToMove, pInfo)
}
}

if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}

const (
// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
// the pod will be moved from unschedulableQ to activeQ.
unschedulableQTimeInterval = 60 * time.Second

queueClosed = "scheduling queue is closed"
)

scheduler cache

//未完待续