Kubernetes Informer 机制

Kubernetes Informer 机制

文章目录

概述

Kubernetes的其他组件都是通过client-go的Informer机制与Kubernetes API Server进行通信的。

informer

在Informer架构设计中,有多个核心组件,分别介绍如下。
1.Reflector Reflector用于监控(Watch)指定的Kubernetes资源,当监控的资源发生变化时,触发相应的变更事件,例如Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存DeltaFIFO中。
2.DeltaFIFO DeltaFIFO可以分开理解,FIFO是一个先进先出的队列,它拥有队列操作的基本方法,例如Add、Update、Delete、List、Pop、Close等,而Delta是一个资源对象存储,它可以保存资源对象的操作类型,例如Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等。
3.Indexer Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer与Etcd集群中的数据完全保持一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次从远程Etcd集群中读取,以减轻Kubernetes API Server和Etcd集群的压力。

informer 中支持处理资源的三种回掉方法:

  • AddFunc :当创建资源对象时触发的事件回调方法。
  • UpdateFunc :当更新资源对象时触发的事件回调方法。
  • DeleteFunc :当删除资源对象时触发的事件回调方法。

通过Informer机制可以很容易地监控我们所关心的资源事件.

Reflector

1func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
2   return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
3}

通过NewReflector实例化Reflector对象,实例化过程中须传入ListerWatcher数据接口对象,它拥有List和Watch方法,用于获取及监控资源列表。只要实现了List和Watch方法的对象都可以称为ListerWatcher。 Reflector对象通过Run函数启动监控并处理监控事件。而在Reflector源码实现中,其中最主要的是ListAndWatch函数,它负责获取资源列表(List)和监控(Watch)指定的Kubernetes API Server资源。 ListAndWatch函数实现可分为两部分:第1部分获取资源列表数据,第2部分监控资源对象。

1.获取资源列表数据ListAndWatch List在程序第一次运行时获取该资源下所有的对象数据并将其存储至DeltaFIFO中。

listAndWatch

a. r.listerWatcher.List用于获取资源下的所有对象的数据,例如,获取所有Pod的资源数据。获取资源数据是由options的ResourceVersion(资源版本号)参数控制的,如果ResourceVersion为0,则表示获取所有Pod的资源数据;如果ResourceVersion非0,则表示根据资源版本号继续获取,功能有些类似于文件传输过程中的“断点续传”,当传输过程中遇到网络故障导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。可以使本地缓存中的数据与Etcd集群中的数据保持一致。
b. listMetaInterface.GetResourceVersion用于获取资源版本号,ResourceVersion (资源版本号)非常重要,Kubernetes中所有的资源都拥有该字段,它标识当前资源对象的版本号。每次修改当前资源对象时, Kubernetes API Server都会更改ResourceVersion,使得client-go执行Watch操作时可以根据ResourceVersion来确定当前资源对象是否发生变化。
c. meta.ExtractList用于将资源数据转换成资源对象列表,将runtime.Object对象转换成[]runtime.Object对象。因为r.listerWatcher.List获取的是资源下的所有对象的数据,例如所有的Pod资源数据, 所以它是一个资源列表。
d. r.syncWith用于将资源对象列表中的资源对象和资源版本号存储至DeltaFIFO中,并会替换已存在的对象。
e. r.setLastSyncResourceVersion用于设置最新的资源版本号。

2.监控资源对象

Watch(监控)操作通过HTTP协议与Kubernetes API Server建立长连接,接收Kubernetes API Server 发来的资源变更事件。Watch操作的实现机制使用HTTP协议的分块传输编码(Chunked Transfer Encoding)。当client-go调用Kubernetes API Server时,Kubernetes API Server在Response的HTTP Header中设置Transfer-Encoding的值为chunked,表示采用分块传输编码,客户端收到该信息后,便与服务端进行连接,并等待下一个数据块(即资源的事件信息)。

当触发Added(资源添加)事件、Updated (资源更新)事件、Deleted(资源删除)事件时,将对应的资源对象更新到本地缓存DeltaFIFO中并更新ResourceVersion资源版本号。

DeltaFIFO

DeltaFIFO 顾名思义 是记录资源对象变化的先进先出队列, 例如资源的 Added(添加)操作类型、Updated(更新)操作类型、Deleted(删除)操作类型、Sync(同步)操作类型等.

 1type DeltaFIFO struct {
 2  
 3    ......
 4
 5   items map[string]Deltas 
 6    
 7   queue []string  
 8  ......
 9
10}

queue字段存储资源对象的key,该key通过KeyOf函数计算得到。items字段通过map数据结构的方式存储,value存储的是对象的Deltas数组。

deltafifo
key 是 / 组合, 如果namespace 为空,则 key是 .

DeltaFIFO 本质上是一个先进先出的队列, 有生产者和消费者, 其中生产者是 Reflector 调用的Add 方法, 消费者是Controller 调用的Pop 方法.

生产者

Reflector在收到资源对象的变更事件时, 会通过 Delta FIFO 的 queueActionLocked 函数 向DeltaFIFO 添加事件. 关键代码如下所示:

 1func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {  
 2   id, err := f.KeyOf(obj)  
 3   ....
 4   oldDeltas := f.items[id]  
 5   newDeltas := append(oldDeltas, Delta{actionType, obj})  
 6   newDeltas = dedupDeltas(newDeltas)  
 7
 8   if len(newDeltas) > 0 {  
 9      if _, exists := f.items[id]; !exists {  
10         f.queue = append(f.queue, id)  
11      }
12      f.items[id] = newDeltas  
13      f.cond.Broadcast()  
14   } 
15   ...
16   return nil  
17}  

执行流程解释如下 :
(1)通过 Keyof函数计算出资源对象的key;
(2)将 actiontype和资源对象构造成 Delta 添加到 oldDeltas 中 生成新的 Delta ,并通过 dedupDeltas 函数进行去重操作;
(3)将新生成的 Delta 存储到 DeltaFIFO 的 item和 queue 中, 并通过cond. Broadcast通知所有消费者解除阻塞;

消费者

Controller会通过Delta FIFO的POP函数获取数据, 具体逻辑如下:

 1func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
 2   f.lock.Lock()
 3   defer f.lock.Unlock()
 4   for {
 5      for len(f.queue) == 0 {
 6         .....
 7         f.cond.Wait()
 8      }
 9      id := f.queue[0]
10      f.queue = f.queue[1:]
11      ....
12      item, ok := f.items[id]
13      .....
14
15      delete(f.items, id)
16      err := process(item)
17	  if e, ok := err.(ErrRequeue); ok {
18   		f.addIfNotPresent(id, item)
19   		err = e.Err
20	  }
21
22      return item, err
23   }
24}

当队列中没有数据时,通过f.cond.wait阻塞等待数据,只有收到cond. Broadcast时才说明有数据被添加,解除当前阻塞状态。如果队列中不为空,取出f. queue的头部数据将该对象传入 process回调函数,由上层消费者进行处理。如果 processi回调函数处理出错则将该对象重新存入队列。 proccess 的实现是 sharedIndexInformer.HandleDeltas, 具体代码如下:

 1func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
 2   s.blockDeltas.Lock()
 3   defer s.blockDeltas.Unlock()
 4
 5   // from oldest to newest
 6   for _, d := range obj.(Deltas) {
 7      switch d.Type {
 8      case Sync, Replaced, Added, Updated:
 9         ....
10         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
11            if err := s.indexer.Update(d.Object); err != nil {
12               return err
13            }
14
15       		....
16            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
17         } else {
18            if err := s.indexer.Add(d.Object); err != nil {
19               return err
20            }
21            s.processor.distribute(addNotification{newObj: d.Object}, false)
22         }
23      case Deleted:
24         if err := s.indexer.Delete(d.Object); err != nil {
25            return err
26         }
27         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
28      }
29   }
30   return nil
31}

当资源对象的操作类型为Added、Updated、Deleted时,将该资源对象存储至Indexer(它是并发安全的存储),并通过distribute函数将资源对象分发至SharedInformer。

Indexer

Indexer是client-go用来存储资源对象并自带索引功能的本地存储,Reflector从DeltaFIFO中将消费出来的资源对象存储至Indexer。Indexer中的数据与Etcd集群中的数据保持完全一致。client-go可以很方便地从本地存储中读取相应的资源对象数据,而无须每次都从远程Etcd集群中读取,这样可以减轻Kubernetes API Server和Etcd集群的压力。
index定义如下:

1
2type cache struct {
3   
4   cacheStorage ThreadSafeStore
5 
6   keyFunc KeyFunc
7}

index 主要是对 ThreadSafeStore 的封装, 另外增加了 keyFunc 用来生成资源的ID. ThreadSafeMap是一个内存中的存储,其中的数据并不会写入本地磁盘中,每次的增、删、改、查操作都会加锁,以保证数据的一致性。数据结构定义如下:

1type threadSafeMap struct {
2   lock  sync.RWMutex
3   items map[string]interface{}
4
5   // indexers maps a name to an IndexFunc
6   indexers Indexers
7   // indices maps a name to an Index
8   indices Indices
9}

items字段中存储的是资源对象数据,其中items的key通过keyFunc函数计算得到;
indexers 存储索引函数,

indices 存储缓存器, key 为缓存名称, 和索引名字相同, value为缓存数据; index 为缓存数据, 结构为key/value

indices 结构如下所示:

indices

从index 中获取数据的核心逻辑如下所示:

 1
 2
 3func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
 4   c.lock.RLock()
 5   defer c.lock.RUnlock()
 6
 7   indexFunc := c.indexers[indexName]
 8   if indexFunc == nil {
 9      return nil, fmt.Errorf("Index with name %s does not exist", indexName)
10   }
11
12   index := c.indices[indexName]
13
14   set := index[indexedValue]
15   list := make([]interface{}, 0, set.Len())
16   for key := range set {
17      list = append(list, c.items[key])
18   }
19
20   return list, nil
21}

ByIndex接收两个参数:IndexName(索引器名称)和indexKey(需要检索的key)。首先从c.indexers中查找指定的索引器函数,从c.indices中查找指定的缓存器函数,然后根据需要检索的indexKey从缓存数据中查到并返回数据。

总结

通过本文我们了解到了 informer 核心实现逻辑,informer 通过本地缓存大大减轻了对API的压力。