首頁 > 軟體

一個Pod排程失敗後重新觸發排程的所有情況分析

2023-12-07 14:01:18

正文

在 k8s 中一個Pod由於某些原因排程失敗後,會被放入排程失敗佇列,這個佇列裡面的Pod後面都怎麼樣了呢?

他們怎麼樣才能重新獲取到”重新做人的機會“呢?這篇文章,我們從原始碼的角度來看看來龍去脈

在 k8s 中會起兩個協程,定期把 backoffQ 和 unscheduledQ 裡面的 Pod拿到activeQ裡面去

func (p *PriorityQueue) Run() {
   go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
   go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

flushUnschedulablePodsLeftover

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
   p.lock.Lock()
   defer p.lock.Unlock()
   var podsToMove []*framework.QueuedPodInfo
   currentTime := p.clock.Now()
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      lastScheduleTime := pInfo.Timestamp
      if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
         podsToMove = append(podsToMove, pInfo)
      }
   }
   if len(podsToMove) > 0 {
      p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
   }
}
    func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
       activated := false
       for _, pInfo := range podInfoList {
          // If the event doesn't help making the Pod schedulable, continue.
          // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
          // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
          // In that case, it's desired to move it anyways.
          if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
             continue
          }
          pod := pInfo.Pod
          if p.isPodBackingoff(pInfo) {
             if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
             } else {
                metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          } else {
             if err := p.activeQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
             } else {
                    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          }
       }
       p.moveRequestCycle = p.schedulingCycle
       if activated {
          p.cond.Broadcast()
       }
    }

將在unscheduledQ裡面停留時長超過podMaxInUnschedulablePodsDuration(預設是5min)的pod放入到 ActiveQ 或 BackoffQueue,具體是放到哪個佇列裡面,根據下面規則判斷:

  • 根據這個Pod嘗試被排程的次數,計算這個Pod應該等待下一次排程的時間,計算規則為指數級增長,即按照1s,2s,4s,8s這樣的時間進行等待,但是這個等待時間也不會無限增加,會受到 podMaxBackoffDuration(預設10s) 的限制,這個引數的意思是一個 Pod處於Backoff的最大時間,如果等待的時間如果超過了 podMaxBackoffDuration,那麼就只等待 podMaxBackoffDuration 就會再次被排程;
  • 當前時間 - 上次排程的時間 > 根據1獲取到的應該等待的時間,那麼就把Pod放到activeQ裡面,將會被排程,否則Pod被放入 backoff 佇列裡繼續等待,如果是在backoff 佇列等待的話,後面就會被flushBackoffQCompleted取出

所以這裡 Pod 如果滿足條件的話 就一定會從unscheduleQ裡面移到 backooff裡面或者activeQ裡面

flushBackoffQCompleted

去取 backoff 佇列(優先佇列)裡面取等待時間結束的 Pod,放入 activeQ

func (p *PriorityQueue) flushBackoffQCompleted() {
   p.lock.Lock()
   defer p.lock.Unlock()
   activated := false
   for {
      rawPodInfo := p.podBackoffQ.Peek()
      if rawPodInfo == nil {
         break
      }
      pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
      boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
      if boTime.After(p.clock.Now()) {
         break
      }
      _, err := p.podBackoffQ.Pop()
      if err != nil {
         klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
         break
      }
      p.activeQ.Add(rawPodInfo)
      metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
      activated = true
   }
   if activated {
      p.cond.Broadcast()
   }
}

那麼除了上述定期主動去判斷一個 UnscheduledQ 或 backoffQ 裡面的Pod是不是可以再次被排程,那麼還有沒有其他情況呢?

答案是有的。

還有四種情況會重新判斷這兩個佇列裡的 Pod 是不是要重新排程

  • 有新節點加入叢集
  • 節點設定或狀態發生變化
  • 已經存在的 Pod 發生變化
  • 叢集內有Pod被刪除
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
   cache.ResourceEventHandlerFuncs{
      AddFunc:    sched.addNodeToCache,
      UpdateFunc: sched.updateNodeInCache,
      DeleteFunc: sched.deleteNodeFromCache,
   },
)

新加入節點

func (sched *Scheduler) addNodeToCache(obj interface{}) {
   node, ok := obj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj)
      return
   }
   nodeInfo := sched.Cache.AddNode(node)
   klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
   // Note: the following checks doesn't take preemption into considerations, in very rare
   // cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
   // chose to ignore those cases as unschedulable pods will be re-queued eventually.
   return func(pod *v1.Pod) bool {
      admissionResults := AdmissionCheck(pod, nodeInfo, false)
      if len(admissionResults) != 0 {
         return false
      }
      _, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
         return t.Effect == v1.TaintEffectNoSchedule
      })
      return !isUntolerated
   }
}

可以看到,當有節點加入叢集的時候,會把unscheduledQ 裡面的Pod 依次拿出來做下面的判斷:

  • Pod 對 節點的親和性
  • Pod 中 Nodename不為空 那麼判斷新加入節點的Name判斷pod Nodename是否相等
  • 判斷 Pod 中容器對埠的要求是否和新加入節點已經被使用的埠衝突
  • Pod 是否容忍了Node的Pod

只有上述4個條件都滿足,那麼新加入節點這個事件才會觸發這個未被排程的Pod加入到 backoffQ 或者 activeQ,至於是加入哪個queue,上面已經分析過了

節點更新

func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
   oldNode, ok := oldObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
      return
   }
   newNode, ok := newObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
      return
   }
   nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
   // Only requeue unschedulable pods if the node became more schedulable.
   if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
      sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
   }
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
   if nodeSpecUnschedulableChanged(newNode, oldNode) {
      return &queue.NodeSpecUnschedulableChange
   }
   if nodeAllocatableChanged(newNode, oldNode) {
      return &queue.NodeAllocatableChange
   }
   if nodeLabelsChanged(newNode, oldNode) {
      return &queue.NodeLabelChange
   }
   if nodeTaintsChanged(newNode, oldNode) {
      return &queue.NodeTaintChange
   }
   if nodeConditionsChanged(newNode, oldNode) {
      return &queue.NodeConditionChange
   }
   return nil
}

首先是判斷節點是何種設定發生了變化,有如下情況

  • 節點可排程情況發生變化
  • 節點可分配資源發生變化
  • 節點標籤發生變化
  • 節點汙點發生變化
  • 節點狀態發生變化

如果某個 Pod 排程失敗的原因可以匹配到上面其中一個原因,那麼節點更新這個事件才會觸發這個未被排程的Pod加入到 backoffQ 或者 activeQ

informerFactory.Core().V1().Pods().Informer().AddEventHandler(
   cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
         switch t := obj.(type) {
         case *v1.Pod:
            return assignedPod(t)
         case cache.DeletedFinalStateUnknown:
            if _, ok := t.Obj.(*v1.Pod); ok {
               // The carried object may be stale, so we don't use it to check if
               // it's assigned or not. Attempting to cleanup anyways.
               return true
            }
            utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
            return false
         default:
            utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
            return false
         }
      },
      Handler: cache.ResourceEventHandlerFuncs{
         AddFunc:    sched.addPodToCache,
         UpdateFunc: sched.updatePodInCache,
         DeleteFunc: sched.deletePodFromCache,
      },
   },
)

已經存在的 Pod 發生變化

func (sched *Scheduler) addPodToCache(obj interface{}) {
   pod, ok := obj.(*v1.Pod)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
      return
   }
   klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.AddPod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
   p.lock.Lock()
   p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
   p.lock.Unlock()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
   var nsLabels labels.Set
   nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
   var podsToMove []*framework.QueuedPodInfo
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      for _, term := range pInfo.RequiredAffinityTerms {
         if term.Matches(pod, nsLabels) {
            podsToMove = append(podsToMove, pInfo)
            break
         }
      }
   }
   return podsToMove
}

可以看到,已經存在的Pod發生變化後,會把這個Pod親和性設定依次和unscheduledQ裡面的Pod匹配,如果能夠匹配上,那麼節點更新這個事件才會觸發這個未被排程的Pod加入到 backoffQ 或者 activeQ。

叢集內有Pod刪除

func (sched *Scheduler) deletePodFromCache(obj interface{}) {
  var pod *v1.Pod
   switch t := obj.(type) {
   case *v1.Pod:
      pod = t
   case cache.DeletedFinalStateUnknown:
      var ok bool
      pod, ok = t.Obj.(*v1.Pod)
      if !ok {
         klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
         return
      }
   default:
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t)
      return
   }
   klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.RemovePod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}

可以看到,Pod刪除時間不像其他時間需要做額外的判斷,這個preCheck函數是空的,所以所有 unscheduledQ 裡面的Pod都會被放到 activeQ或者backoffQ裡面。

從上面的情況,我們可以看到,叢集內有事件發生變化,是可以加速排程失敗的Pod被重新排程的程序的。常規的是,排程失敗的 Pod 需要等5min 然後才會被重新加入 backoff 或 activeQ。backoffQ裡面的Pod也需要等一段時間才會重新排程。這也就是為什麼,當你修改節點設定的時候,能看到Pod馬上重新被排程的原因

上面就是一個Pod排程失敗後,重新觸發排程的所有情況了。

更多關於Pod排程失敗重新觸發的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com