client-go 中的 informer 源码分析
本文将以图文并茂的方式对 client-go 中的 informer 的源码分析,其整体流程图如下所示。

前言
Kubernetes作为新一代的基础设施系统,其重要性已经不言而喻了。基于控制器模型实现的声明式API支持着集群中各类型的工作负载稳定高效的按照期望状态运转,随着越来越多的用户选择kubernetes,无论是为了深入了解kubernetes这一云原生操作系统的工作逻辑,还是期待能够根据自己的特定业务需求对kubernetes进行二次开发,了解控制器模型的实现机制都是非常重要的。kubernetes提供了client-go以方便使用go语言进行二次快发,本文试图讲述client-go各模块如informer、reflector、cache等实现细节。
当我们需要利用client-go来实现自定义控制器时,通常会使用informerFactory来管理控制器需要的多个资源对象的informer实例
SharedInformerFactory结构
使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例,避免同一个资源创建多个实例,这里的informer实现是shareIndexInformer NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions,将返回一个sharedInformerFactory对象
client: clientset,支持直接请求api中各内置资源对象的restful group客户端集合 namespace: factory关注的namespace(默认All Namespace),informer中的reflector将只会listAndWatch指定namespace的资源 defaultResync: 用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段,用于定时的将local store同步到deltaFIFO customResync:支持针对每一个informer来配置resync时间,通过WithCustomResyncConfig这个Option配置,否则就用指定的defaultResync informers:factory管理的informer集合 startedInformers:记录已经启动的informer集合
sharedInformerFactory对象的关键方法:
创建一个sharedInformerFactory
启动factory下的所有informer
等待informer的cache被同步
等待每一个ShareIndexInformer的cache被同步,具体怎么算同步完成?
sharedInformerFactory的WaitForCacheSync将会不断调用factory持有的所有informer的HasSynced方法,直到返回true
而informer的HasSynced方法调用的自己持有的controller的HasSynced方法(informer结构持有controller对象,下文会分析informer的结构)
informer中的controller的HasSynced方法则调用的是controller持有的deltaFIFO对象的HasSynced方法
也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步,最终看的是informer中的deltaFIFO是否同步了,deltaFIFO的结构下文将会分析
factory为自己添加informer
只有向factory中添加informer,factory才有意义,添加完成之后,上面factory的start方法就可以启动了
obj: informer关注的资源如deployment{} newFunc: 一个知道如何创建指定informer的方法,k8s为每一个内置的对象都实现了这个方法,比如创建deployment的ShareIndexInformer的方法
shareIndexInformer对应的newFunc的实现
client-go中已经为所有内置对象都提供了NewInformerFunc
以deployment为例,通过调用factory.Apps().V1().Deployments()即可为factory添加一个deployment对应的shareIndexInformer的实现,具体过程如下:
调用factory.Apps().V1().Deployments()即会调用以下Deployments方法创建deploymentInformer对象
只要调用了factory.Apps().V1().Deployments()返回的deploymentInformer的Informer或Lister方法,就完成了向factory中添加deployment informer
deploymentInformer的defaultInformer方法将会创建出一个shareIndexInformer
shareIndexInformer结构
indexer:底层缓存,其实就是一个map记录对象,再通过一些其他map在插入删除对象是根据索引函数维护索引key如ns与对象pod的关系 controller:informer内部的一个controller,这个controller包含reflector:根据用户定义的ListWatch方法获取对象并更新增量队列DeltaFIFO processor:知道如何处理DeltaFIFO队列中的对象,实现是sharedProcessor{} listerWatcher:知道如何list对象和watch对象的方法 objectType:deployment{} resyncCheckPeriod: 给自己的controller的reflector每隔多少s<尝试>调用listener的shouldResync方法 defaultEventHandlerResyncPeriod:通过AddEventHandler方法给informer配置回调时如果没有配置的默认值,这个值用在processor的listener中判断是否需要进行resync,最小1s
两个字段的默认值都是来自创建factory时指定的defaultResync,当resyncPeriod < s.resyncCheckPeriod时,如果informer已经启动了才添加的EventHandler,那么调整resyncPeriod为resyncCheckPeriod,否则调整resyncCheckPeriod为resyncPeriod
sharedIndexInformer对象的关键方法:
sharedIndexInformer的Run方法
前面factory的start方法就是调用了这个Run方法
该方法初始化了controller对象并启动,同时调用processor.run启动所有的listener,用于回调用户配置的EventHandler
具体sharedIndexInformer中的processor中的listener是怎么添加的,看下文shareProcessor的分析
为shareIndexInformer创建controller
创建Controller的New方法会生成一个controller对象,只初始化controller的config成员,controller的reflector成员是在Run的时候初始化:
通过执行reflector.Run方法启动reflector,开启对指定对象的listAndWatch过程,获取的对象将添加到reflector的deltaFIFO中
通过不断执行processLoop方法,从DeltaFIFO pop出对象,再调用reflector的Process(就是shareIndexInformer的HandleDeltas方法)处理
controller的processLoop方法
不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理
deltaFIFO pop出来的对象处理逻辑
先看看controller怎么处理DeltaFIFO中的对象,需要注意DeltaFIFO中的Deltas的结构,是一个slice,保存同一个对象的所有增量事件

sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时,先尝试更新到本地缓存cache,更新成功的话会调用processor.distribute方法向processor中的listener添加notification,listener启动之后会不断获取notification回调用户的EventHandler方法
Sync: reflector list到对象时Replace到deltaFIFO时daltaType为Sync或者resync把localstrore中的对象加回到deltaFIFO
Added、Updated: reflector watch到对象时根据watch event type是Add还是Modify对应deltaType为Added或者Updated
Deleted: reflector watch到对象的watch event type是Delete或者re-list Replace到deltaFIFO时local store多出的对象以Delete的方式加入deltaFIFO
前面描述了shareIndexInformer内部如何从deltaFIFO取出对象更新缓存并通过processor回调用户的EventHandler,那deltaFIFO中的增量事件是怎么加进入的呢?先看看shareIndexInformer中controller中的reflector实现
reflector.run发起ListWatch
reflector.run将会调用指定资源的ListAndWatch方法,注意这里什么时候可能发生re-list或者re-watch:因为是通过wait.Util不断调用ListAndWatch方法,所以只要该方法return了,那么就会发生re-list,watch过程则被嵌套在for循环中
以ResourceVersion=0开始首次的List操作获取指定资源的全量对象,并通过reflector的syncWith方法将所有对象批量插入deltaFIFO
List完成之后将会更新ResourceVersion用户Watch操作,通过reflector的watchHandler方法把watch到的增量对象加入到deltaFIFO
list出的对象批量插入deltaFIFO
可以看到是syncWith方法是通过调用deltaFIFO的Replace实现批量插入,具体实现见下文中deltaFIFO的实现描述
watch出的增量对象插入到deltaFIFO
watch到的对象直接根据watch到的事件类型eventType更新store(即deltaFIFO),注意这个event是api直接返回的,watch event type可能是Added、Modifyed、Deleted
定时触发resync
在ListAndWatch中还起了一个gorouting定时的进行resync动作
调用deltaFIFO的Resync方法,把底层缓存的对象全部重新添加到deltaFIFO中
需要注意的是,在添加对象到deltaFIFO时会检查该队列中有没有增量没有处理完的,如果有则忽略这个对象的此次resync
底层缓存的实现
shareIndexInformer中带有一个缓存indexer,是一个支持索引的map,优点是支持快速查询:
Indexer、Queue接口和cache结构体都实现了顶层的Store接口
cache结构体持有threadSafeStore对象,threadSafeStore是线程安全的,并且具备自定义索引查找的能力
threadSafeMap的结构如下:
items:存储具体的对象,比如key为ns/podName,value为pod{} Indexers:一个map[string]IndexFunc结构,其中key为索引的名称,如’namespace’字符串,value则是一个具体的索引函数 Indices:一个map[string]Index结构,其中key也是索引的名称,value是一个map[string]sets.String结构,其中key是具体的namespace,如default这个ns,vlaue则是这个ns下的按照索引函数求出来的值的集合,比如default这个ns下的所有pod对象名称
索引的维护
通过在向items插入对象的过程中,遍历所有的Indexers中的索引函数,根据索引函数存储索引key到value的集合关系,以下图式结构可以很好的说明:

缓存中增加对象
在向threadSafeMap的items map中增加完对象后,再通过updateIndices更新索引结构
IndexFunc索引函数
一个典型的索引函数MetaNamespaceIndexFunc,方便查询时可以根据namespace获取该namespace下的所有对象
Index方法利用索引查找对象
提供利用索引来查询的能力,Index方法可以根据索引名称和对象,查询所有的关联对象
例如通过
Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})获取指定ns下的所有对象,具体可以参考tools/cache/listers.go#ListAllByNamespace
deltaFIFO实现
shareIndexInformer.controller.reflector中的deltaFIFO实现
items:记录deltaFIFO中的对象,注意map的value是一个delta slice queue:记录上面items中的key,维护对象的fifo顺序 populated:队列中是否填充过数据,LIST时调用Replace或调用Delete/Add/Update都会置为true initialPopulationCount:首次List的时候获取到的数据就会调用Replace批量增加到队列,同时设置initialPopulationCount为List到的对象数量,每次Pop出来会减一,用于判断是否把首次批量插入的数据都POP出去了 keyFunc:知道怎么从对象中解析出对应key的函数,如MetaNamespaceKeyFunc可以解析出namespace/name的形式 knownObjects:这个其实就是shareIndexInformer中的indexer底层缓存的引用,可以认为和etcd中的数据一致
DeltaFIFO关键的方法:
向deltaFIFO批量插入对象
批量向队列插入数据的方法,注意knownObjects是informer中本地缓存indexer的引用
这里会更新deltaFIFO的initialPopulationCount为Replace list的对象总数加上list中相比knownObjects多出的对象数量。
因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 因为这个对象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装delta,所以叫做FinalStateUnknown。
从deltaFIFO pop出对象
从队列中Pop出一个方法,并由函数process来处理,其实就是shareIndexInformer的HandleDeltas
每次从DeltaFIFO Pop出一个对象,f.initialPopulationCount会减一,初始值为List时的对象数量 前面的Informer的WaitForCacheSync最终就是调用了这个HasSynced方法
deltaFIFO是否同步完成
串连前面的问题:factory的WaitForCacheSync是如何等待缓存同步完成
factory的WaitForCacheSync方法调用informer的HasSync方法,继而调用deltaFIFO的HasSync方法,也就是判断从reflector list到的数据是否pop完
同步local store到deltaFIFO
所谓的resync,其实就是把knownObjects即缓存中的对象全部再通过queueActionLocked(Sync, obj)加到队列
在deltaFIFO增加一个对象
注意这里在append增量时的去重逻辑:如果连续的两个增量类型都是Deleted,那么就去掉一个(正常情况确实不会出现这样,且没必要),优先去掉前面所说的因为re-list可能导致的api与local store不一致而增加的DeletedFinalStateUnknown类型的增量
当前认为只有连续的两个Delete delta才有必要去重
sharedProcessor的实现
shareIndexInformer中的sharedProcess结构,用于分发deltaFIFO的对象,回调用户配置的EventHandler方法
可以看到shareIndexInformer中的process直接通过&sharedProcessor{clock: realClock}初始化
如下为sharedProcessor结构:
listenersStarted:listeners中包含的listener是否都已经启动了 listeners:已添加的listener列表,用来处理watch到的数据 syncingListeners:已添加的listener列表,用来处理list或者resync的数据
理解listeners和syncingListeners的区别
processor可以支持listener的维度配置是否需要resync:一个informer可以配置多个EventHandler,而一个EventHandler对应processor中的一个listener,每个listener可以配置需不需要resync,如果某个listener需要resync,那么添加到deltaFIFO的Sync增量最终也只会回到对应的listener
reflector中会定时判断每一个listener是否需要进行resync,判断的依据是看配置EventHandler的时候指定的resyncPeriod,0代表该listener不需要resync,否则就每隔resyncPeriod看看是否到时间了
listeners:记录了informer添加的所有listener
syncingListeners:记录了informer中哪些listener处于sync状态
syncingListeners是listeners的子集,syncingListeners记录那些开启了resync且时间已经到达了的listener,把它们放在一个独立的slice是避免下面分析的distribute方法中把obj增加到了还不需要resync的listener中
为sharedProcessor添加listener
在sharedProcessor中添加一个listener
启动sharedProcessor中的listener
sharedProcessor启动所有的listener 是通过调用listener.run和listener.pop来启动一个listener,两个方法具体作用看下文processorListener说明
sharedProcessor分发对象
distribute方法是在前面介绍[deltaFIFO pop出来的对象处理逻辑]时提到的,把notification事件添加到listener中,listener如何pop出notification回调EventHandler见下文listener部分分析
当通过distribute分发从deltaFIFO获取的对象时,如果delta type是Sync,那么就会把对象交给sync listener来处理,而Sync类型的delta只能来源于下面两种情况:
reflector list Replace到deltaFIFO的对象:因为首次在sharedProcessor增加一个listener的时候是同时加在listeners和syncingListeners中的
reflector定时触发resync local store到deltaFIFO的对象:因为每次reflector调用processor的shouldResync时,都会把达到resync条件的listener筛选出来重新放到p.syncingListeners
上面两种情况都可以在p.syncingListeners中准备好listener
processorListener结构
sharedProcessor中的listener具体的类型:运转逻辑就是把用户通过addCh增加的事件发送到nextCh供run方法取出回调Eventhandler,因为addCh和nectCh都是无缓冲channel,所以中间引入ringBuffer做缓存
processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcessor,对于一个Informer,可以多次调用AddEventHandler来添加多个listener
addCh:无缓冲的chan,listener的pod方法不断从addCh取出对象丢给nextCh。addCh中的对象来源于listener的add方法,如果nextCh不能及时消费,则放入缓冲区pendingNotifications nextCh:无缓冲的chan,listener的run方法不断从nextCh取出对象回调用户handler。nextCh的对象来源于addCh或者缓冲区 pendingNotifications:一个无容量限制的环形缓冲区,可以理解为可以无限存储的队列,用来存储deltaFIFO分发过来的消息 nextResync:由resyncPeriod和requestedResyncPeriod计算得出,与当前时间now比较判断listener是否该进行resync了 resyncPeriod:listener自身期待多长时间进行resync requestedResyncPeriod:informer希望listener多长时间进行resync
在listener中添加事件
shareProcessor中的distribute方法调用的是listener的add来向addCh增加消息,注意addCh是无缓冲的chan,依赖pop不断从addCh取出数据
判断是否需要resync
如果resyncPeriod为0表示不需要resync,否则判断当前时间now是否已经超过了nextResync,是的话则返回true表示需要resync。其中nextResync在每次调用listener的shouldResync方法成功时更新
listener的run方法回调EventHandler
listener的run方法不断的从nextCh中获取notification,并根据notification的类型来调用用户自定的EventHandler
addCh到nextCh的对象传递
listener中pop方法的逻辑相对比较绕,最终目的就是把分发到addCh的数据从nextCh或者pendingNotifications取出来
notification变量记录下一次要被放到p.nextCh供pop方法取出的对象 开始seletct时必然只有case2可能ready Case2做的事可以描述为:从p.addCh获取对象,如果临时变量notification还是nil,说明需要往notification赋值,供case1推送到p.nextCh 如果notification已经有值了,那个当前从p.addCh取出的值要先放到环形缓冲区中
Case1做的事可以描述为:看看能不能把临时变量notification推送到nextCh(nil chan会阻塞在读写操作上),可以写的话,说明这个nextCh是p.nextCh,写成功之后,需要从缓存中取出一个对象放到notification为下次执行这个case做准备,如果缓存是空的,通过把nextCh chan设置为nil来禁用case1,以便case2位notification赋值
最后更新于