client-go源码分析(三) - WorkQueue
| 阅读 | 共 1416 字,阅读约
Overview
WorkQueue
WorkQueue称为工作队列,比普通的FIFO负责,主要功能在于标记和去重,并支持如下特性:
- 有序
- 去重
- 并发性
- 标记机制:标记是否被处理过
- 通知机制
- 延迟
- 限速
- Metric:支持监控指标
WorkQueue支持三种队列,并提供了3种接口,不同的实现应对不同的场景
- Interface:FIFO队列,去重队列
- DelayingInterface:延时队列
- RateLimitingInterface:限速队列
FIFO队列
源码位置:k8s.io/client-go/util/workqueue/quque.go
1type Interface interface {
2 // 添加元素
3 Add(item interface{})
4 // 队列长度
5 Len() int
6 // 获取队列头部第一个元素
7 Get() (item interface{}, shutdown bool)
8 // 标记队列中该元素已被处理
9 Done(item interface{})
10 // 关闭队列
11 ShutDown()
12 // 查询队列是否关闭
13 ShuttingDown() bool
14}
15
16// FIFO数据结构
17type Type struct {
18 // 实际存储数据的地方
19 queue []t
20
21 // 保证去重
22 dirty set
23 // 标记元素是否正在被处理
24 processing set
25
26 cond *sync.Cond
27
28 shuttingDown bool
29
30 metrics queueMetrics
31
32 unfinishedWorkUpdatePeriod time.Duration
33 clock clock.Clock
34}
延迟队列
延迟队列基于FIFO队列,增加了AddAfter方法,原理是:延迟一段时间后再将元素插入FIFO。
AddAfter方法会插入一个参数,并附带一个时间,指定插入元素的延迟时间
源码位置:k8s.io/client-go/util/workqueue/delaying_queue.go
1type DelayingInterface interface {
2 Interface
3 // AddAfter adds an item to the workqueue after the indicated duration has passed
4 AddAfter(item interface{}, duration time.Duration)
5}
6
7type delayingType struct {
8 Interface
9 clock clock.Clock
10 stopCh chan struct{}
11 stopOnce sync.Once
12 heartbeat clock.Ticker
13
14 // 插入的元素数量阈值,超过该值延迟队列会阻塞
15 waitingForAddCh chan *waitFor
16 metrics retryMetrics
17}
限速队列
限速队列基于延迟队列和FIFO队列接口封装,新增了以下方法:
- AddRateLimited
- Forget
- NumRequeues
源码位置:k8s.io/client-go/util/workqueue/rate_limiting_queue.go
1type RateLimitingInterface interface {
2 DelayingInterface
3
4 // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
5 AddRateLimited(item interface{})
6
7 // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing
8 // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you
9 // still have to call `Done` on the queue.
10 Forget(item interface{})
11
12 // NumRequeues returns back how many times the item was requeued
13 NumRequeues(item interface{}) int
14}
限速队列提供了4中限速算法接口,利用延迟队列的特性达到限速目的:
- 令牌桶算法
- 排队指数算法
- 计数器算法
- 混合模式
源码位置:k8s.io/client-go/util/workqueue/default_rate_limiters.go
1type RateLimiter interface {
2 // 获取指定元素应该等待的时间
3 When(item interface{}) time.Duration
4 // 释放指定元素,清空该元素的排队数
5 Forget(item interface{})
6 // 获取指定元素的排队数
7 NumRequeues(item interface{}) int
8}
事件管理器
事件是一种资源管理器,用于展示集群内发生的情况。各组件会将事件上报给api-serverer。
源码位置:k8s.io/api/core/v1/types.go
1type Event struct {
2 metav1.TypeMeta `json:",inline"`
3 // Standard object's metadata.
4 // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
5 metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`
6
7 // The object that this event is about.
8 InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`
9
10 // This should be a short, machine understandable string that gives the reason
11 // for the transition into the object's current status.
12 // TODO: provide exact specification for format.
13 // +optional
14 Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`
15
16 // A human-readable description of the status of this operation.
17 // TODO: decide on maximum length.
18 // +optional
19 Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`
20
21 // The component reporting this event. Should be a short machine understandable string.
22 // +optional
23 Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`
24
25 // The time at which the event was first recorded. (Time of server receipt is in TypeMeta.)
26 // +optional
27 FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`
28
29 // The time at which the most recent occurrence of this event was recorded.
30 // +optional
31 LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`
32
33 // The number of times this event has occurred.
34 // +optional
35 Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`
36
37 // Type of this event (Normal, Warning), new types could be added in the future
38 // +optional
39 Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`
40
41 // Time when this Event was first observed.
42 // +optional
43 EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`
44
45 // Data about the Event series this event represents or nil if it's a singleton Event.
46 // +optional
47 Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`
48
49 // What action was taken/failed regarding to the Regarding object.
50 // +optional
51 Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`
52
53 // Optional secondary object for more complex actions.
54 // +optional
55 Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`
56
57 // Name of the controller that emitted this Event, e.g. `kubernetes.io/kubelet`.
58 // +optional
59 ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`
60
61 // ID of the controller instance, e.g. `kubelet-xyzf`.
62 // +optional
63 ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
64}