typesharedIndexInformerstruct { indexer Indexer//informer中的底层缓存cache controller Controller//持有reflector和deltaFIFO对象,reflector对象将会listWatch对象添加到deltaFIFO,同时更新indexer cahce,更新成功则通过sharedProcessor触发用户配置的Eventhandler processor *sharedProcessor//持有一系列的listener,每个listener对应用户的EventHandler cacheMutationDetector MutationDetector//可以先忽略,这个对象可以用来监测local cache是否被外部直接修改// This block is tracked to handle late initialization of the controller listerWatcher ListerWatcher//deployment的listWatch方法 objectType runtime.Object// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call// shouldResync to check if any of our listeners need a resync. resyncCheckPeriod time.Duration// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default// value). defaultEventHandlerResyncPeriod time.Duration// clock allows for testability clock clock.Clock started, stopped bool startedLock sync.Mutex// blockDeltas gives a way to stop all event distribution so that a late event handler// can safely join the shared informer. blockDeltas sync.Mutex}
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key)if err !=nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)returnnil } elseif!exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)returnnil }// If we are doing Resync() and there is already an event queued for that object,// we ignore the Resync for it. This is to avoid the race, in which the resync// comes with the previous value of object (since queueing an event for the object// doesn't trigger changing the underlying store <knownObjects>. id, err := f.KeyOf(obj)if err !=nil {returnKeyError{obj, err} }// 如果deltaFIFO中该对象还有增量没有处理,则忽略以避免冲突,原因如上面注释:在同一个对象的增量列表中,排在后面的增量的object相比前面的增量应该更新才是合理的iflen(f.items[id]) >0 {returnnil }// 跟deltaFIFO的Replace方法一样,都是添加一个Sync类型的增量if err := f.queueActionLocked(Sync, obj); err !=nil {return fmt.Errorf("couldn't queue object: %v", err) }returnnil}
typethreadSafeMapstruct { lock sync.RWMutex items map[string]interface{}// indexers maps a name to an IndexFunc indexers Indexers// indices maps a name to an Index indices Indices}// Indexers maps a name to a IndexFunctypeIndexersmap[string]IndexFunc// Indices maps a name to an IndextypeIndicesmap[string]IndextypeIndexmap[string]sets.String
func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock()defer c.lock.Unlock() oldObject := c.items[key]//存储对象 c.items[key] = obj//更新索引 c.updateIndices(oldObject, obj, key)}// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj// updateIndices must be called from a function that already has a lock on the cachefunc (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {// if we got an old object, we need to remove it before we add it againif oldObj !=nil {// 这是一个更新操作,先删除原对象的索引记录 c.deleteFromIndices(oldObj, key) }// 枚举所有添加的索引函数for name, indexFunc :=range c.indexers {//根据索引函数计算obj对应的 indexValues, err :=indexFunc(newObj)if err !=nil {panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name]if index ==nil { index =Index{} c.indices[name] = index }//索引函数计算出多个value,也可能是一个,比如pod的ns就只有一个值,pod的label可能就有多个值for _, indexValue :=range indexValues {//比如namespace索引,根据indexValue=default,获取default对应的ji he再把当前对象插入 set := index[indexValue]if set ==nil { set =sets.String{} index[indexValue] = set } set.Insert(key) } }}
// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespacefuncMetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj)if err !=nil {return []string{""}, fmt.Errorf("object has no meta: %v", err) }return []string{meta.GetNamespace()}, nil}
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { c.lock.RLock()defer c.lock.RUnlock() indexFunc := c.indexers[indexName]if indexFunc ==nil {returnnil, fmt.Errorf("Index with name %s does not exist", indexName) } indexKeys, err :=indexFunc(obj)if err !=nil {returnnil, err } index := c.indices[indexName]var returnKeySet sets.String//例如namespace索引iflen(indexKeys) ==1 {// In majority of cases, there is exactly one value matching.// Optimize the most common path - deduping is not needed here. returnKeySet = index[indexKeys[0]]//例如label索引 } else {// Need to de-dupe the return list.// Since multiple keys are allowed, this can happen. returnKeySet =sets.String{}for _, indexKey :=range indexKeys {for key :=range index[indexKey] { returnKeySet.Insert(key) } } } list :=make([]interface{}, 0, returnKeySet.Len())for absoluteKey :=range returnKeySet { list =append(list, c.items[absoluteKey]) }return list, nil}
// NewDeltaFIFO方法在前面分析的sharedIndexInformer的Run方法中调用// fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)funcNewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { f :=&DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: keyFunc, knownObjects: knownObjects, } f.cond.L =&f.lockreturn f}typeDeltaFIFOstruct {// lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta.// 这里的Deltas是[]Delta类型 items map[string]Deltas queue []string// populated is true if the first batch of items inserted by Replace() has been populated// or Delete/Add/Update was called first. populated bool// initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int// keyFunc is used to make the key used for queued item// insertion and retrieval, and should be deterministic. keyFunc KeyFunc// knownObjects list keys that are "known", for the// purpose of figuring out which items have been deleted// when Replace() or Delete() is called.// 这个其实就是shareIndexInformer中的indexer底层缓存的引用 knownObjects KeyListerGetter// Indication the queue is closed.// Used to indicate a queue is closed so a control loop can exit when a queue is empty.// Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex}typeDeltastruct { Type DeltaType Object interface{}}// Deltas is a list of one or more 'Delta's to an individual object.// The oldest delta is at index 0, the newest delta is the last one.typeDeltas []Delta
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock()defer f.lock.Unlock()for {forlen(f.queue) ==0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the f.closed is set and the condition is broadcasted.// Which causes this loop to continue and return from the Pop().if f.IsClosed() {returnnil, ErrFIFOClosed } f.cond.Wait() }//取出队首元素 id := f.queue[0]//去掉队首元素 f.queue = f.queue[1:]//首次填充的对象数减一if f.initialPopulationCount >0 { f.initialPopulationCount-- } item, ok := f.items[id]if!ok {// Item may have been deleted subsequently.continue }delete(f.items, id)//处理增量对象 err :=process(item)// 如果没有处理成功,那么就会重新加到deltaFIFO队列中if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err }// Don't need to copyDeltas here, because we're transferring// ownership to the caller.return item, err }}
func (f *DeltaFIFO) Resync() error { f.lock.Lock()defer f.lock.Unlock()if f.knownObjects ==nil {returnnil } keys := f.knownObjects.ListKeys()// 把local store中的对象都以Sync类型增量的形式重新放回到deltaFIFOfor _, k :=range keys {if err := f.syncKeyLocked(k); err !=nil {return err } }returnnil}func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key)// If we are doing Resync() and there is already an event queued for that object,// we ignore the Resync for it. This is to avoid the race, in which the resync// comes with the previous value of object (since queueing an event for the object// doesn't trigger changing the underlying store <knownObjects>. id, err := f.KeyOf(obj)if err !=nil {returnKeyError{obj, err} }// 如上述注释,在resync时,如果deltaFIFO中该对象还存在其他delta没处理,那么忽略这次的resync// 因为调用queueActionLocked是增加delta是通过append的,且处理对象的增量delta时,是从oldest到newdest的// 所以如果某个对象还存在增量没处理,再append就可能导致后处理的delta是旧的对象iflen(f.items[id]) >0 {returnnil }// 可以看到这里跟list一样,增加到deltaFIFO的是一个Sync类型的增量if err := f.queueActionLocked(Sync, obj); err !=nil {return fmt.Errorf("couldn't queue object: %v", err) }returnnil}
//在队列中给指定的对象append一个Deltafunc (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj)if err !=nil {returnKeyError{obj, err} }// 把增量append到slice的后面 newDeltas :=append(f.items[id], Delta{actionType, obj})// 连续的两个Deleted delta将会去掉一个 newDeltas =dedupDeltas(newDeltas)iflen(newDeltas) >0 {// 维护queue队列if _, exists := f.items[id]; !exists { f.queue =append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else {// We need to remove this from our map (extra items in the queue are// ignored if they are not in the map).delete(f.items, id) }returnnil}
当前认为只有连续的两个Delete delta才有必要去重
funcdedupDeltas(deltas Deltas) Deltas { n :=len(deltas)if n <2 {return deltas }// 每次取最后两个delta来判断 a :=&deltas[n-1] b :=&deltas[n-2]if out :=isDup(a, b); out !=nil { d :=append(Deltas{}, deltas[:n-2]...)returnappend(d, *out) }return deltas}funcisDup(a, b *Delta) *Delta {// 当前认为只有连续的两个Delete delta才有必要去重if out :=isDeletionDup(a, b); out !=nil {return out }// TODO: Detect other duplicate situations? Are there any?returnnil}// keep the one with the most information if both are deletions.funcisDeletionDup(a, b *Delta) *Delta {if b.Type != Deleted || a.Type != Deleted {returnnil }// Do more sophisticated checks, or is this sufficient?// 优先去重DeletedFinalStateUnknown类型的Deleted deltaif _, ok := b.Object.(DeletedFinalStateUnknown); ok {return a }return b}
typeprocessorListenerstruct { nextCh chaninterface{} addCh chaninterface{} handler ResourceEventHandler// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications// added until we OOM.// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but// we should try to do something better. pendingNotifications buffer.RingGrowing// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer requestedResyncPeriod time.Duration// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the// informer's overall resync check period. resyncPeriod time.Duration// nextResync is the earliest time the listener should get a full resync nextResync time.Time// resyncLock guards access to resyncPeriod and nextResync resyncLock sync.Mutex}
// shouldResync queries every listener to determine if any of them need a resync, based on each// listener's resyncPeriod.func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock()defer p.listenersLock.Unlock()// 这里每次都会先置空列表,保证里面记录了当前需要resync的listener p.syncingListeners = []*processorListener{} resyncNeeded :=false now := p.clock.Now()for _, listener :=range p.listeners {// need to loop through all the listeners to see if they need to resync so we can prepare any// listeners that are going to be resyncing.if listener.shouldResync(now) { resyncNeeded =true// 达到resync条件的listener被加入syncingListeners p.syncingListeners =append(p.syncingListeners, listener) listener.determineNextResync(now) } }return resyncNeeded}
func (p *processorListener) run() {// this call blocks until the channel is closed. When a panic happens during the notification// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)// the next notification will be attempted. This is usually better than the alternative of never// delivering again. stopCh :=make(chanstruct{}) wait.Until(func() {// this gives us a few quick retries before a long pause and then a few more quick retries err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {for next :=range p.nextCh {switch notification := next.(type) {caseupdateNotification:// 回调用户配置的handler p.handler.OnUpdate(notification.oldObj, notification.newObj)caseaddNotification: p.handler.OnAdd(notification.newObj)casedeleteNotification: p.handler.OnDelete(notification.oldObj)default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } }// the only way to get here is if the p.nextCh is empty and closedreturntrue, nil })// the only way to get here is if the p.nextCh is empty and closedif err ==nil {close(stopCh) } }, 1*time.Minute, stopCh)}
func (p *processorListener) pop() {defer utilruntime.HandleCrash()deferclose(p.nextCh) // Tell .run() to stop//nextCh没有利用make初始化,将阻塞在读和写上var nextCh chan<-interface{}//notification初始值为nilvar notification interface{}for {select {// 执行这个case,相当于给p.nextCh添加来自p.addCh的内容case nextCh <- notification:// Notification dispatchedvar ok bool//前面的notification已经加到p.nextCh了, 为下一次这个case再次ready做准备 notification, ok = p.pendingNotifications.ReadOne()if!ok { // Nothing to pop nextCh =nil// Disable this select case }//第一次select只有这个case readycase notificationToAdd, ok :=<-p.addCh:if!ok {return }if notification ==nil { // No notification to pop (and pendingNotifications is empty)// Optimize the case - skip adding to pendingNotifications//为notification赋值 notification = notificationToAdd//唤醒第一个case nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched//select没有命中第一个case,那么notification就没有被消耗,那么把从p.addCh获取的对象加到缓存中 p.pendingNotifications.WriteOne(notificationToAdd) } } }}