typesharedIndexInformerstruct { indexer Indexer//informer中的底层缓存cache controller Controller //持有reflector和deltaFIFO对象,reflector对象将会listWatch对象添加到deltaFIFO,同时更新indexer cahce,更新成功则通过sharedProcessor触发用户配置的Eventhandler
processor *sharedProcessor//持有一系列的listener,每个listener对应用户的EventHandler cacheMutationDetector MutationDetector//可以先忽略,这个对象可以用来监测local cache是否被外部直接修改// This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher//deployment的listWatch方法 objectType runtime.Object// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call// shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default// value). defaultEventHandlerResyncPeriod time.Duration// clock allows for testability clock clock.Clock started, stopped bool startedLock sync.Mutex// blockDeltas gives a way to stop all event distribution so that a late event handler// can safely join the shared informer. blockDeltas sync.Mutex}
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key)if err !=nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)returnnil } elseif!exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)returnnil }// If we are doing Resync() and there is already an event queued for that object,// we ignore the Resync for it. This is to avoid the race, in which the resync// comes with the previous value of object (since queueing an event for the object// doesn't trigger changing the underlying store <knownObjects>. id, err := f.KeyOf(obj)if err !=nil {returnKeyError{obj, err} }// 如果deltaFIFO中该对象还有增量没有处理,则忽略以避免冲突,原因如上面注释:在同一个对象的增量列表中,排在后面的增量的object相比前面的增量应该更新才是合理的iflen(f.items[id]) >0 {returnnil }// 跟deltaFIFO的Replace方法一样,都是添加一个Sync类型的增量if err := f.queueActionLocked(Sync, obj); err !=nil {return fmt.Errorf("couldn't queue object: %v", err) }returnnil}
typethreadSafeMapstruct { lock sync.RWMutex items map[string]interface{}// indexers maps a name to an IndexFunc indexers Indexers// indices maps a name to an Index indices Indices}// Indexers maps a name to a IndexFunctypeIndexersmap[string]IndexFunc// Indices maps a name to an IndextypeIndicesmap[string]IndextypeIndexmap[string]sets.String
func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock()defer c.lock.Unlock() oldObject := c.items[key]//存储对象 c.items[key] = obj//更新索引 c.updateIndices(oldObject, obj, key)}// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj// updateIndices must be called from a function that already has a lock on the cachefunc (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {// if we got an old object, we need to remove it before we add it againif oldObj !=nil {// 这是一个更新操作,先删除原对象的索引记录 c.deleteFromIndices(oldObj, key) }// 枚举所有添加的索引函数for name, indexFunc :=range c.indexers {//根据索引函数计算obj对应的 indexValues, err := indexFunc(newObj)if err !=nil {panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name]if index ==nil { index =Index{} c.indices[name] = index }//索引函数计算出多个value,也可能是一个,比如pod的ns就只有一个值,pod的label可能就有多个值for _, indexValue :=range indexValues {//比如namespace索引,根据indexValue=default,获取default对应的ji he再把当前对象插入 set := index[indexValue]if set ==nil { set =sets.String{} index[indexValue] = set } set.Insert(key) } }}
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespacefuncMetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj)if err !=nil {return []string{""}, fmt.Errorf("object has no meta: %v", err) }return []string{meta.GetNamespace()}, nil}
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { c.lock.RLock()defer c.lock.RUnlock() indexFunc := c.indexers[indexName]if indexFunc ==nil {returnnil, fmt.Errorf("Index with name %s does not exist", indexName) } indexKeys, err := indexFunc(obj)if err !=nil {returnnil, err } index := c.indices[indexName]var returnKeySet sets.String//例如namespace索引iflen(indexKeys) ==1 {// In majority of cases, there is exactly one value matching.// Optimize the most common path - deduping is not needed here. returnKeySet = index[indexKeys[0]]//例如label索引 } else {// Need to de-dupe the return list.// Since multiple keys are allowed, this can happen. returnKeySet =sets.String{}for _, indexKey :=range indexKeys {for key :=range index[indexKey] { returnKeySet.Insert(key) } } } list :=make([]interface{}, 0, returnKeySet.Len())for absoluteKey :=range returnKeySet { list =append(list, c.items[absoluteKey]) }return list, nil}
// NewDeltaFIFO方法在前面分析的sharedIndexInformer的Run方法中调用// fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)funcNewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f :=&DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, knownObjects: knownObjects, } f.cond.L =&f.lockreturn f}typeDeltaFIFOstruct {// lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta.// 这里的Deltas是[]Delta类型 items map[string]Deltas queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first. populated bool// initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic. keyFunc KeyFunc// knownObjects list keys that are "known", for the// purpose of figuring out which items have been deleted// when Replace() or Delete() is called.// 这个其实就是shareIndexInformer中的indexer底层缓存的引用 knownObjects KeyListerGetter// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex}typeDeltastruct { Type DeltaType Object interface{}}// Deltas is a list of one or more 'Delta's to an individual object.// The oldest delta is at index 0, the newest delta is the last one.typeDeltas []Delta
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock()defer f.lock.Unlock()for {forlen(f.queue) ==0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {returnnil, ErrFIFOClosed } f.cond.Wait() }//取出队首元素 id := f.queue[0]//去掉队首元素 f.queue = f.queue[1:]//首次填充的对象数减一if f.initialPopulationCount >0 { f.initialPopulationCount-- } item, ok := f.items[id]if!ok {// Item may have been deleted subsequently.continue }delete(f.items, id)//处理增量对象 err := process(item)// 如果没有处理成功,那么就会重新加到deltaFIFO队列中if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err }// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err }}
func (f *DeltaFIFO) Resync() error { f.lock.Lock()defer f.lock.Unlock()if f.knownObjects ==nil {returnnil } keys := f.knownObjects.ListKeys()// 把local store中的对象都以Sync类型增量的形式重新放回到deltaFIFOfor _, k :=range keys {if err := f.syncKeyLocked(k); err !=nil {return err } }returnnil}func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key)// If we are doing Resync() and there is already an event queued for that object,// we ignore the Resync for it. This is to avoid the race, in which the resync// comes with the previous value of object (since queueing an event for the object// doesn't trigger changing the underlying store <knownObjects>. id, err := f.KeyOf(obj)if err !=nil {returnKeyError{obj, err} }// 如上述注释,在resync时,如果deltaFIFO中该对象还存在其他delta没处理,那么忽略这次的resync// 因为调用queueActionLocked是增加delta是通过append的,且处理对象的增量delta时,是从oldest到newdest的// 所以如果某个对象还存在增量没处理,再append就可能导致后处理的delta是旧的对象iflen(f.items[id]) >0 {returnnil }// 可以看到这里跟list一样,增加到deltaFIFO的是一个Sync类型的增量if err := f.queueActionLocked(Sync, obj); err !=nil {return fmt.Errorf("couldn't queue object: %v", err) }returnnil}