client-go源码分析(三) - WorkQueue


| 阅读 |,阅读约 3 分钟
| 复制链接:

Overview

WorkQueue

WorkQueue称为工作队列,比普通的FIFO负责,主要功能在于标记和去重,并支持如下特性:

  • 有序
  • 去重
  • 并发性
  • 标记机制:标记是否被处理过
  • 通知机制
  • 延迟
  • 限速
  • Metric:支持监控指标

WorkQueue支持三种队列,并提供了3种接口,不同的实现应对不同的场景

  • Interface:FIFO队列,去重队列
  • DelayingInterface:延时队列
  • RateLimitingInterface:限速队列

FIFO队列

源码位置:k8s.io/client-go/util/workqueue/quque.go

 1type Interface interface {
 2  // 添加元素
 3  Add(item interface{})
 4  // 队列长度
 5  Len() int
 6  // 获取队列头部第一个元素
 7  Get() (item interface{}, shutdown bool)
 8  // 标记队列中该元素已被处理
 9  Done(item interface{})
10  // 关闭队列
11  ShutDown()
12  // 查询队列是否关闭
13  ShuttingDown() bool
14}
15
16// FIFO数据结构
17type Type struct {
18  // 实际存储数据的地方
19  queue []t
20
21  // 保证去重
22  dirty set
23  // 标记元素是否正在被处理
24  processing set
25
26  cond *sync.Cond
27
28  shuttingDown bool
29
30  metrics queueMetrics
31
32  unfinishedWorkUpdatePeriod time.Duration
33  clock                      clock.Clock
34}

延迟队列

延迟队列基于FIFO队列,增加了AddAfter方法,原理是:延迟一段时间后再将元素插入FIFO。

AddAfter方法会插入一个参数,并附带一个时间,指定插入元素的延迟时间

源码位置:k8s.io/client-go/util/workqueue/delaying_queue.go

 1type DelayingInterface interface {
 2  Interface
 3  // AddAfter adds an item to the workqueue after the indicated duration has passed
 4  AddAfter(item interface{}, duration time.Duration)
 5}
 6
 7type delayingType struct {
 8  Interface
 9  clock clock.Clock
10  stopCh chan struct{}
11  stopOnce sync.Once
12  heartbeat clock.Ticker
13
14  // 插入的元素数量阈值,超过该值延迟队列会阻塞
15  waitingForAddCh chan *waitFor
16  metrics retryMetrics
17}

限速队列

限速队列基于延迟队列和FIFO队列接口封装,新增了以下方法:

  • AddRateLimited
  • Forget
  • NumRequeues

源码位置:k8s.io/client-go/util/workqueue/rate_limiting_queue.go

 1type RateLimitingInterface interface {
 2  DelayingInterface
 3
 4  // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
 5  AddRateLimited(item interface{})
 6
 7  // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
 8  // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
 9  // still have to call `Done` on the queue.
10  Forget(item interface{})
11
12  // NumRequeues returns back how many times the item was requeued
13  NumRequeues(item interface{}) int
14}

限速队列提供了4中限速算法接口,利用延迟队列的特性达到限速目的:

  • 令牌桶算法
  • 排队指数算法
  • 计数器算法
  • 混合模式

源码位置:k8s.io/client-go/util/workqueue/default_rate_limiters.go

1type RateLimiter interface {
2  // 获取指定元素应该等待的时间
3  When(item interface{}) time.Duration
4  // 释放指定元素,清空该元素的排队数
5  Forget(item interface{})
6  // 获取指定元素的排队数
7  NumRequeues(item interface{}) int
8}

事件管理器

事件是一种资源管理器,用于展示集群内发生的情况。各组件会将事件上报给api-serverer。

源码位置:k8s.io/api/core/v1/types.go

 1type Event struct {
 2  metav1.TypeMeta `json:",inline"`
 3  // Standard object's metadata.
 4  // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
 5  metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
 6
 7  // The object that this event is about.
 8  InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
 9
10  // This should be a short, machine understandable string that gives the reason
11  // for the transition into the object's current status.
12  // TODO: provide exact specification for format.
13  // +optional
14  Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
15
16  // A human-readable description of the status of this operation.
17  // TODO: decide on maximum length.
18  // +optional
19  Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
20
21  // The component reporting this event. Should be a short machine understandable string.
22  // +optional
23  Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
24
25  // The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
26  // +optional
27  FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
28
29  // The time at which the most recent occurrence of this event was recorded.
30  // +optional
31  LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
32
33  // The number of times this event has occurred.
34  // +optional
35  Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
36
37  // Type of this event (Normal, Warning), new types could be added in the future
38  // +optional
39  Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
40
41  // Time when this Event was first observed.
42  // +optional
43  EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
44
45  // Data about the Event series this event represents or nil if it's a singleton Event.
46  // +optional
47  Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
48
49  // What action was taken/failed regarding to the Regarding object.
50  // +optional
51  Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
52
53  // Optional secondary object for more complex actions.
54  // +optional
55  Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
56
57  // Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
58  // +optional
59  ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
60
61  // ID of the controller instance, e.g. `kubelet-xyzf`.
62  // +optional
63  ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
64}