本文将以图文并茂的方式对 client-go 中的 informer 的源码分析,其整体流程图如下所示。
前言
Kubernetes作为新一代的基础设施系统,其重要性已经不言而喻了。基于控制器模型实现的声明式API支持着集群中各类型的工作负载稳定高效的按照期望状态运转,随着越来越多的用户选择kubernetes,无论是为了深入了解kubernetes这一云原生操作系统的工作逻辑,还是期待能够根据自己的特定业务需求对kubernetes进行二次开发,了解控制器模型的实现机制都是非常重要的。kubernetes提供了client-go以方便使用go语言进行二次快发,本文试图讲述client-go各模块如informer、reflector、cache等实现细节。
当我们需要利用client-go来实现自定义控制器时,通常会使用informerFactory来管理控制器需要的多个资源对象的informer实例
复制 // 创建一个informer factory
kubeInformerFactory := kubeinformers. NewSharedInformerFactory (kubeClient, time.Second * 30 )
// factory已经为所有k8s的内置资源对象提供了创建对应informer实例的方法,调用具体informer实例的Lister或Informer方法
// 就完成了将informer注册到factory的过程
deploymentLister := kubeInformerFactory. Apps (). V1 (). Deployments (). Lister ()
// 启动注册到factory的所有informer
kubeInformerFactory. Start (stopCh)
SharedInformerFactory结构
使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例,避免同一个资源创建多个实例,这里的informer实现是shareIndexInformer NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions,将返回一个sharedInformerFactory对象
client: clientset,支持直接请求api中各内置资源对象的restful group客户端集合 namespace: factory关注的namespace(默认All Namespace),informer中的reflector将只会listAndWatch指定namespace的资源 defaultResync: 用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段,用于定时的将local store同步到deltaFIFO customResync:支持针对每一个informer来配置resync时间,通过WithCustomResyncConfig这个Option配置,否则就用指定的defaultResync informers:factory管理的informer集合 startedInformers:记录已经启动的informer集合
复制 type sharedInformerFactory struct {
client kubernetes . Interface //clientset
namespace string //关注的namepace,可以通过WithNamespace Option配置
tweakListOptions internalinterfaces . TweakListOptionsFunc
lock sync . Mutex
defaultResync time . Duration //前面传过来的时间,如30s
customResync map [ reflect . Type ] time . Duration //自定义resync时间
informers map [ reflect . Type ] cache . SharedIndexInformer //针对每种类型资源存储一个informer,informer的类型是ShareIndexInformer
startedInformers map [ reflect . Type ] bool //每个informer是否都启动了
}
sharedInformerFactory对象的关键方法:
创建一个sharedInformerFactory
复制 func NewSharedInformerFactoryWithOptions (client kubernetes . Interface , defaultResync time . Duration , options ... SharedInformerOption ) SharedInformerFactory {
factory := & sharedInformerFactory {
client: client, //clientset,对原生资源来说,这里可以直接使用kube clientset
namespace: v1.NamespaceAll, //可以看到默认是监听所有ns下的指定资源
defaultResync: defaultResync, //30s
//以下初始化map结构
informers: make ( map [ reflect . Type ] cache . SharedIndexInformer ),
startedInformers: make ( map [ reflect . Type ] bool ),
customResync: make ( map [ reflect . Type ] time . Duration ),
}
return factory
}
启动factory下的所有informer
复制 func (f * sharedInformerFactory ) Start (stopCh <-chan struct {}) {
f.lock. Lock ()
defer f.lock. Unlock ()
for informerType, informer := range f.informers {
if ! f.startedInformers[informerType] {
//直接起gorouting调用informer的Run方法,并且标记对应的informer已经启动
go informer. Run (stopCh)
f.startedInformers[informerType] = true
}
}
}
等待informer的cache被同步
等待每一个ShareIndexInformer的cache被同步,具体怎么算同步完成?
sharedInformerFactory的WaitForCacheSync将会不断调用factory持有的所有informer的HasSynced方法,直到返回true
而informer的HasSynced方法调用的自己持有的controller的HasSynced方法(informer结构持有controller对象,下文会分析informer的结构)
informer中的controller的HasSynced方法则调用的是controller持有的deltaFIFO对象的HasSynced方法
也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步,最终看的是informer中的deltaFIFO是否同步了,deltaFIFO的结构下文将会分析
复制 func (f * sharedInformerFactory ) WaitForCacheSync (stopCh <-chan struct {}) map [ reflect . Type ] bool {
//获取每一个已经启动的informer
informers := func () map [ reflect . Type ] cache . SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
informers := map [ reflect . Type ] cache . SharedIndexInformer {}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map [ reflect . Type ] bool {}
// 等待他们的cache被同步,调用的是informer的HasSynced方法
for informType, informer := range informers {
res[informType] = cache. WaitForCacheSync (stopCh, informer.HasSynced)
}
return res
}
factory为自己添加informer
只有向factory中添加informer,factory才有意义,添加完成之后,上面factory的start方法就可以启动了
obj: informer关注的资源如deployment{} newFunc: 一个知道如何创建指定informer的方法,k8s为每一个内置的对象都实现了这个方法,比如创建deployment的ShareIndexInformer的方法
复制 // 向factory中注册指定的informer
func (f * sharedInformerFactory ) InformerFor (obj runtime . Object , newFunc internalinterfaces . NewInformerFunc ) cache . SharedIndexInformer {
f.lock. Lock ()
defer f.lock. Unlock ()
//根据对象类型判断factory中是否已经有对应informer
informerType := reflect. TypeOf (obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
//如果factory中已经有这个对象类型的informer,就不创建了
resyncPeriod, exists := f.customResync[informerType]
if ! exists {
resyncPeriod = f.defaultResync
}
//没有就根据newFunc创建一个,并存在map中
informer = newFunc (f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
shareIndexInformer对应的newFunc的实现
client-go中已经为所有内置对象都提供了NewInformerFunc
以deployment为例,通过调用factory.Apps().V1().Deployments()即可为factory添加一个deployment对应的shareIndexInformer的实现,具体过程如下:
调用factory.Apps().V1().Deployments()即会调用以下Deployments方法创建deploymentInformer对象
复制 func (v * version ) Deployments () DeploymentInformer {
return & deploymentInformer {factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
只要调用了factory.Apps().V1().Deployments()返回的deploymentInformer的Informer或Lister方法,就完成了向factory中添加deployment informer
复制 // deploymentInformer对象具有defaultInformer、Informer、Lister方法
// 可以看到创建deploymentInformer时传递了一个带索引的缓存,附带了一个namespace索引,后面可以了解带索引的缓存实现,比如可以支持查询:某个namespace下的所有pod
// 用于创建对应的shareIndexInformer,该方法提供给factory的InformerFor方法
func (f * deploymentInformer ) defaultInformer (client kubernetes . Interface , resyncPeriod time . Duration ) cache . SharedIndexInformer {
return NewFilteredDeploymentInformer (client, f.namespace, resyncPeriod, cache . Indexers {cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
// 向factor中添加dpeloyment的shareIndexInformer并返回
func (f * deploymentInformer ) Informer () cache . SharedIndexInformer {
return f.factory. InformerFor ( & appsv1 . Deployment {}, f.defaultInformer)
}
// 返回dpeloyment的lister对象,该lister中持有上面创建出的shareIndexInformer的cache的引用,方便通过缓存获取对象
func (f * deploymentInformer ) Lister () v1 . DeploymentLister {
return v1. NewDeploymentLister (f. Informer (). GetIndexer ())
}
deploymentInformer的defaultInformer方法将会创建出一个shareIndexInformer
复制 // 可先看看下面的shareIndexInformer结构
func NewFilteredDeploymentInformer (client kubernetes . Interface , namespace string , resyncPeriod time . Duration , indexers cache . Indexers , tweakListOptions internalinterfaces . TweakListOptionsFunc ) cache . SharedIndexInformer {
return cache. NewSharedIndexInformer (
// 定义对象的ListWatch方法,这里直接用的是clientset中的方法
& cache . ListWatch {
ListFunc: func (options v1 . ListOptions ) ( runtime . Object , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. AppsV1beta1 (). Deployments (namespace). List (options)
},
WatchFunc: func (options v1 . ListOptions ) ( watch . Interface , error ) {
if tweakListOptions != nil {
tweakListOptions ( & options)
}
return client. AppsV1beta1 (). Deployments (namespace). Watch (options)
},
},
& appsv1beta1 . Deployment {},
resyncPeriod, //创建factory是指定的时间,如30s
indexers,
)
}
shareIndexInformer结构
indexer:底层缓存,其实就是一个map记录对象,再通过一些其他map在插入删除对象是根据索引函数维护索引key如ns与对象pod的关系 controller:informer内部的一个controller,这个controller包含reflector:根据用户定义的ListWatch方法获取对象并更新增量队列DeltaFIFO processor:知道如何处理DeltaFIFO队列中的对象,实现是sharedProcessor{} listerWatcher:知道如何list对象和watch对象的方法 objectType:deployment{} resyncCheckPeriod: 给自己的controller的reflector每隔多少s<尝试>调用listener的shouldResync方法 defaultEventHandlerResyncPeriod:通过AddEventHandler方法给informer配置回调时如果没有配置的默认值,这个值用在processor的listener中判断是否需要进行resync,最小1s
两个字段的默认值都是来自创建factory时指定的defaultResync,当resyncPeriod < s.resyncCheckPeriod时,如果informer已经启动了才添加的EventHandler,那么调整resyncPeriod为resyncCheckPeriod,否则调整resyncCheckPeriod为resyncPeriod
复制 type sharedIndexInformer struct {
indexer Indexer //informer中的底层缓存cache
controller Controller //持有reflector和deltaFIFO对象,reflector对象将会listWatch对象添加到deltaFIFO,同时更新indexer cahce,更新成功则通过sharedProcessor触发用户配置的Eventhandler
processor * sharedProcessor //持有一系列的listener,每个listener对应用户的EventHandler
cacheMutationDetector MutationDetector //可以先忽略,这个对象可以用来监测local cache是否被外部直接修改
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher //deployment的listWatch方法
objectType runtime . Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time . Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time . Duration
// clock allows for testability
clock clock . Clock
started, stopped bool
startedLock sync . Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync . Mutex
}
sharedIndexInformer对象的关键方法:
sharedIndexInformer的Run方法
前面factory的start方法就是调用了这个Run方法
该方法初始化了controller对象并启动,同时调用processor.run启动所有的listener,用于回调用户配置的EventHandler
具体sharedIndexInformer中的processor中的listener是怎么添加的,看下文shareProcessor的分析
复制 func (s * sharedIndexInformer ) Run (stopCh <-chan struct {}) {
defer utilruntime. HandleCrash ()
//创建一个DeltaFIFO,用于shareIndexInformer.controller.reflector
//可以看到这里把indexer即本地缓存传入,用来初始化deltaFIFO的knownObject字段
fifo := NewDeltaFIFO (MetaNamespaceKeyFunc, s.indexer)
//shareIndexInformer中的controller的配置
cfg := & Config {
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false ,
ShouldResync: s.processor.shouldResync, // 这个shouldResync方法将被用在reflector ListAndWatch方法中判断定时时间resyncCheckPeriod到了之后该不该进行resync动作
//一个知道如何处理从informer中的controller中的deltaFIFO pop出来的对象的方法
Process: s.HandleDeltas,
}
func () {
s.startedLock. Lock ()
defer s.startedLock. Unlock ()
// 这里New一个具体的controller
s.controller = New (cfg)
s.controller.( * controller ).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct {})
var wg wait . Group
defer wg. Wait () // Wait for Processor to stop
defer close (processorStopCh) // Tell Processor to stop
// 调用processor.run启动所有的listener,回调用户配置的EventHandler
wg. StartWithChannel (processorStopCh, s.processor.run)
// 启动controller
s.controller. Run (stopCh)
}
为shareIndexInformer创建controller
创建Controller的New方法会生成一个controller对象,只初始化controller的config成员,controller的reflector成员是在Run的时候初始化:
通过执行reflector.Run方法启动reflector,开启对指定对象的listAndWatch过程,获取的对象将添加到reflector的deltaFIFO中
通过不断执行processLoop方法,从DeltaFIFO pop出对象,再调用reflector的Process(就是shareIndexInformer的HandleDeltas方法)处理
复制 func New (c * Config ) Controller {
ctlr := & controller {
config: * c,
clock: & clock . RealClock {},
}
return ctlr
}
//更多字段的配置是在Run的时候
func (c * controller ) Run (stopCh <-chan struct {}) {
// 使用config创建一个Reflector
r := NewReflector (
c.config.ListerWatcher, // deployment的listWatch方法
c.config.ObjectType, // deployment{}
c.config.Queue, //DeltaFIFO
c.config.FullResyncPeriod, //30s
)
r.ShouldResync = c.config.ShouldResync //来自sharedProcessor的方法
r.clock = c.clock
c.reflectorMutex. Lock ()
c.reflector = r
c.reflectorMutex. Unlock ()
var wg wait . Group
defer wg. Wait ()
// 启动reflector,执行ListWatch方法
wg. StartWithChannel (stopCh, r.Run)
// 不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理
wait. Until (c.processLoop, time.Second, stopCh)
}
controller的processLoop方法
不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理
复制 func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
deltaFIFO pop出来的对象处理逻辑
先看看controller怎么处理DeltaFIFO中的对象,需要注意DeltaFIFO中的Deltas的结构,是一个slice,保存同一个对象的所有增量事件
sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时,先尝试更新到本地缓存cache,更新成功的话会调用processor.distribute方法向processor中的listener添加notification,listener启动之后会不断获取notification回调用户的EventHandler方法
Sync: reflector list到对象时Replace到deltaFIFO时daltaType为Sync或者resync把localstrore中的对象加回到deltaFIFO
Added、Updated: reflector watch到对象时根据watch event type是Add还是Modify对应deltaType为Added或者Updated
Deleted: reflector watch到对象的watch event type是Delete或者re-list Replace到deltaFIFO时local store多出的对象以Delete的方式加入deltaFIFO
复制 func (s * sharedIndexInformer ) HandleDeltas (obj interface {}) error {
s.blockDeltas. Lock ()
defer s.blockDeltas. Unlock ()
// from oldest to newest
for _, d := range obj.( Deltas ) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
// 对象先通过shareIndexInformer中的indexer更新到缓存
if old, exists, err := s.indexer. Get (d.Object); err == nil && exists {
if err := s.indexer. Update (d.Object); err != nil {
return err
}
// 如果informer的本地缓存更新成功,那么就调用shareProcess分发对象给用户自定义controller处理
// 可以看到,对EventHandler来说,本地缓存已经存在该对象就认为是update
s.processor. distribute ( updateNotification {oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer. Add (d.Object); err != nil {
return err
}
s.processor. distribute ( addNotification {newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer. Delete (d.Object); err != nil {
return err
}
s.processor. distribute ( deleteNotification {oldObj: d.Object}, false )
}
}
return nil
}
前面描述了shareIndexInformer内部如何从deltaFIFO取出对象更新缓存并通过processor回调用户的EventHandler,那deltaFIFO中的增量事件是怎么加进入的呢?先看看shareIndexInformer中controller中的reflector实现
reflector.run发起ListWatch
reflector.run将会调用指定资源的ListAndWatch方法,注意这里什么时候可能发生re-list或者re-watch:因为是通过wait.Util不断调用ListAndWatch方法,所以只要该方法return了,那么就会发生re-list,watch过程则被嵌套在for循环中
以ResourceVersion=0开始首次的List操作获取指定资源的全量对象,并通过reflector的syncWith方法将所有对象批量插入deltaFIFO
List完成之后将会更新ResourceVersion用户Watch操作,通过reflector的watchHandler方法把watch到的增量对象加入到deltaFIFO
复制 func (r * Reflector ) ListAndWatch (stopCh <-chan struct {}) error {
// 以版本号ResourceVersion=0开始首次list
options := metav1 . ListOptions {ResourceVersion: "0" }
if err := func () error {
initTrace := trace. New ( "Reflector ListAndWatch" , trace . Field { "name" , r.name})
var list runtime . Object
go func () {
// 获取list的结果
list, err = pager. List (context. Background (), options)
close (listCh)
}()
listMetaInterface, err := meta. ListAccessor (list)
// 根据结果更新版本号,用于接下来的watch
resourceVersion = listMetaInterface. GetResourceVersion ()
items, err := meta. ExtractList (list)
// 这里的syncWith是把首次list到的结果通过DeltaFIFO的Replce方法批量添加到队列
// 队列提供了Resync方法用于判断Replace批量插入的对象是否都pop出去了,factory/informer的WaitForCacheSync就是调用了DeltaFIFO的的Resync方法
if err := r. syncWith (items, resourceVersion); err != nil {
return fmt. Errorf ( " %s : Unable to sync list result: %v " , r.name, err)
}
r. setLastSyncResourceVersion (resourceVersion)
}(); err != nil {
return err
}
// 以list对象中获取的ResourceVersion不断watch
for {
start := r.clock. Now ()
w, err := r.listerWatcher. Watch (options)
// watchhandler处理watch到的数据,即把对象根据watch.type增加到DeltaFIFO中
if err := r. watchHandler (start, w, & resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs. IsResourceExpired (err):
klog. V ( 4 ). Infof ( " %s : watch of %v ended with: %v " , r.name, r.expectedType, err)
default :
klog. Warningf ( " %s : watch of %v ended with: %v " , r.name, r.expectedType, err)
}
}
return nil
}
}
}
list出的对象批量插入deltaFIFO
可以看到是syncWith方法是通过调用deltaFIFO的Replace实现批量插入,具体实现见下文中deltaFIFO的实现描述
复制 func (r * Reflector ) syncWith (items [] runtime . Object , resourceVersion string ) error {
found := make ([] interface {}, 0 , len (items))
for _, item := range items {
found = append (found, item)
}
return r.store. Replace (found, resourceVersion)
}
watch出的增量对象插入到deltaFIFO
watch到的对象直接根据watch到的事件类型eventType更新store(即deltaFIFO),注意这个event是api直接返回的,watch event type可能是Added、Modifyed、Deleted
复制 // watchHandler watches w and keeps *resourceVersion up to date.
func (r * Reflector ) watchHandler (start time . Time , w watch . Interface , resourceVersion * string , errc chan error , stopCh <-chan struct {}) error {
for {
select {
case <- stopCh:
return errorStopRequested
case err := <- errc:
return err
case event, ok := <- w. ResultChan ():
switch event.Type {
case watch.Added:
err := r.store. Add (event.Object)
case watch.Modified:
err := r.store. Update (event.Object)
case watch.Deleted:
err := r.store. Delete (event.Object)
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default :
utilruntime. HandleError (fmt. Errorf ( " %s : unable to understand watch event %#v " , r.name, event))
}
* resourceVersion = newResourceVersion
r. setLastSyncResourceVersion (newResourceVersion)
}
}
}
定时触发resync
在ListAndWatch中还起了一个gorouting定时的进行resync动作
复制 resyncerrc := make ( chan error , 1 )
cancelCh := make ( chan struct {})
defer close (cancelCh)
go func () {
//获取一个定时channel,定时的时间是创建informer factory时传入的resyncPeriod
resyncCh, cleanup := r. resyncChan ()
defer func () {
cleanup () // Call the last one written into cleanup
}()
for {
select {
case <- resyncCh:
case <- stopCh:
return
case <- cancelCh:
return
}
if r.ShouldResync == nil || r. ShouldResync () {
klog. V ( 4 ). Infof ( " %s : forcing resync" , r.name)
if err := r.store. Resync (); err != nil {
resyncerrc <- err
return
}
}
cleanup ()
resyncCh, cleanup = r. resyncChan ()
}
}()
调用deltaFIFO的Resync方法,把底层缓存的对象全部重新添加到deltaFIFO中
复制 func (f * DeltaFIFO ) Resync () error {
f.lock. Lock ()
defer f.lock. Unlock ()
if f.knownObjects == nil {
return nil
}
keys := f.knownObjects. ListKeys ()
for _, k := range keys {
if err := f. syncKeyLocked (k); err != nil {
return err
}
}
return nil
}
需要注意的是,在添加对象到deltaFIFO时会检查该队列中有没有增量没有处理完的,如果有则忽略这个对象的此次resync
复制 func (f * DeltaFIFO ) syncKeyLocked (key string ) error {
obj, exists, err := f.knownObjects. GetByKey (key)
if err != nil {
klog. Errorf ( "Unexpected error %v during lookup of key %v , unable to queue object for sync" , err, key)
return nil
} else if ! exists {
klog. Infof ( "Key %v does not exist in known objects store, unable to queue object for sync" , key)
return nil
}
// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
id, err := f. KeyOf (obj)
if err != nil {
return KeyError {obj, err}
}
// 如果deltaFIFO中该对象还有增量没有处理,则忽略以避免冲突,原因如上面注释:在同一个对象的增量列表中,排在后面的增量的object相比前面的增量应该更新才是合理的
if len (f.items[id]) > 0 {
return nil
}
// 跟deltaFIFO的Replace方法一样,都是添加一个Sync类型的增量
if err := f. queueActionLocked (Sync, obj); err != nil {
return fmt. Errorf ( "couldn't queue object: %v " , err)
}
return nil
}
底层缓存的实现
shareIndexInformer中带有一个缓存indexer,是一个支持索引的map,优点是支持快速查询:
Indexer、Queue接口和cache结构体都实现了顶层的Store接口
cache结构体持有threadSafeStore对象,threadSafeStore是线程安全的,并且具备自定义索引查找的能力
threadSafeMap的结构如下:
items:存储具体的对象,比如key为ns/podName,value为pod{} Indexers:一个map[string]IndexFunc结构,其中key为索引的名称,如’namespace’字符串,value则是一个具体的索引函数 Indices:一个map[string]Index结构,其中key也是索引的名称,value是一个map[string]sets.String结构,其中key是具体的namespace,如default这个ns,vlaue则是这个ns下的按照索引函数求出来的值的集合,比如default这个ns下的所有pod对象名称
复制 type threadSafeMap struct {
lock sync . RWMutex
items map [ string ] interface {}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// Indexers maps a name to a IndexFunc
type Indexers map [ string ] IndexFunc
// Indices maps a name to an Index
type Indices map [ string ] Index
type Index map [ string ] sets . String
索引的维护
通过在向items插入对象的过程中,遍历所有的Indexers中的索引函数,根据索引函数存储索引key到value的集合关系,以下图式结构可以很好的说明:
缓存中增加对象
在向threadSafeMap的items map中增加完对象后,再通过updateIndices更新索引结构
复制 func (c * threadSafeMap ) Add (key string , obj interface {}) {
c.lock. Lock ()
defer c.lock. Unlock ()
oldObject := c.items[key]
//存储对象
c.items[key] = obj
//更新索引
c. updateIndices (oldObject, obj, key)
}
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c * threadSafeMap ) updateIndices (oldObj interface {}, newObj interface {}, key string ) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
// 这是一个更新操作,先删除原对象的索引记录
c. deleteFromIndices (oldObj, key)
}
// 枚举所有添加的索引函数
for name, indexFunc := range c.indexers {
//根据索引函数计算obj对应的
indexValues, err := indexFunc (newObj)
if err != nil {
panic (fmt. Errorf ( "unable to calculate an index entry for key %q on index %q : %v " , key, name, err))
}
index := c.indices[name]
if index == nil {
index = Index {}
c.indices[name] = index
}
//索引函数计算出多个value,也可能是一个,比如pod的ns就只有一个值,pod的label可能就有多个值
for _, indexValue := range indexValues {
//比如namespace索引,根据indexValue=default,获取default对应的ji he再把当前对象插入
set := index[indexValue]
if set == nil {
set = sets . String {}
index[indexValue] = set
}
set. Insert (key)
}
}
}
IndexFunc索引函数
一个典型的索引函数MetaNamespaceIndexFunc,方便查询时可以根据namespace获取该namespace下的所有对象
复制 // MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc (obj interface {}) ([] string , error ) {
meta, err := meta. Accessor (obj)
if err != nil {
return [] string { "" }, fmt. Errorf ( "object has no meta: %v " , err)
}
return [] string {meta. GetNamespace ()}, nil
}
Index方法利用索引查找对象
提供利用索引来查询的能力,Index方法可以根据索引名称和对象,查询所有的关联对象
例如通过 Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})
获取指定ns下的所有对象,具体可以参考tools/cache/listers.go#ListAllByNamespace
复制 func (c * threadSafeMap ) Index (indexName string , obj interface {}) ([] interface {}, error ) {
c.lock. RLock ()
defer c.lock. RUnlock ()
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil , fmt. Errorf ( "Index with name %s does not exist" , indexName)
}
indexKeys, err := indexFunc (obj)
if err != nil {
return nil , err
}
index := c.indices[indexName]
var returnKeySet sets . String
//例如namespace索引