本文分 4 部分,分别是:
(1). cache 包源码解析与 Informer 的使用;
(2). informers 包源码解析与 SharedInformerFactory 的使用,以及 Informer 在实际使用中的最佳实践;
(3). 实现自定义资源 (CRD) Informer;
(4). dynamic 包源码解析与 DynamicSharedInformerFactory 的使用;
这里是第 2 部分。
本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。
前言
在前一部分中,我给大家分析了 kubernetes/client-go 仓库中的 cache 包的主要代码,并梳理了 Informer 的工作原理和实现方式,这一部分我将着重分析 informers 包,并介绍 SharedInformerFactory 的正确打开方式。
👮♂️ 如果你没有看过前一部分,建议先看完它。
informers 包源码解析
工厂模式
在前一部分中我们知道,cache 包暴露的 Informer 创建方法有以下 5 个:
- New
- NewInformer
- NewIndexerInformer
- NewSharedInformer
- NewSharedIndexInformer
它们有着不同程度的抽象和封装,NewSharedIndexInformer 是其中抽象程度最低,封装程度最高的一个,但即使是 NewSharedIndexInformer,也没有封装具体的资源类型,需要接收 ListerWatcher 和 Indexers 作为参数:
func NewSharedIndexInformer(
lw ListerWatcher,
exampleObject runtime.Object,
defaultEventHandlerResyncPeriod time.Duration,
indexers Indexers,
) SharedIndexInformer {}
就是说要构建 Informer,我们还得先构建 ListWatcher 和 Indexers。对于多种多样的资源类型来说,应该如何设计以更好地封装,提高代码的复用性呢?informers 包采用了工厂模式:为每种内置的资源类型创建对应的 Informer 工厂类,要使用某种资源 Informer 的时候直接使用工厂类构建。
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func main() {
fmt.Println("----- 9-shared-informer-factory -----")
// mustClientset 用于创建 kubernetes.Interface 实例,
// 代码在前一部分中;
// 第 2 个参数是 defaultResync,是构建新 Informer 时默认的 resyncPeriod,
// resyncPeriod 在前一部分中介绍过了;
informerFactory := informers.NewSharedInformerFactoryWithOptions(
mustClientset(), 0, informers.WithNamespace("tmp"))
configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()
configMapsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("created: %s\n", configMap.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
configMap, ok := newObj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("updated: %s\n", configMap.Name)
},
DeleteFunc: func(obj interface{}) {
configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
return
}
fmt.Printf("deleted: %s\n", configMap.Name)
},
})
stopCh := make(chan struct{})
defer close(stopCh)
fmt.Println("Start syncing....")
go informerFactory.Start(stopCh)
<-stopCh
}
输出类似于:
----- 9-shared-informer-factory -----
Start syncing....
created: demo1
created: demo
deleted: demo
created: demo
本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。
上面的代码结构应该非常清晰,不用过多解释。值得一提的是 NewSharedInformerFactoryWithOptions 使用了函数式选项 (functional options) 模式:
// SharedInformerOption defines the functional option type for SharedInformerFactory.
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
func NewSharedInformerFactoryWithOptions(
client kubernetes.Interface,
defaultResync time.Duration,
options ...SharedInformerOption,
) SharedInformerFactory {}
接收的选项 (options) 会被依次执行,通过这种方式对 SharedInformerFactory 实例的参数进行调整。
如果没有调整的需要,可以直接使用默认方法:
func NewSharedInformerFactory(
client kubernetes.Interface,
defaultResync time.Duration,
) SharedInformerFactory {}
👨🔧 默认的 namespace 是所有 namespace。
通过 SharedInformerFactory,我们可以很方便地构建各种资源的 Informer 实例,比如:
configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()
注意很重要的一点:对每种资源类型构建的 Informer 会被缓存,对相同资源重复调用 Informer() 返回的是同一个 Informer 实例。
除了构建 Informer 外,SharedInformerFactory 还支持构建 Lister,比如:
configMapLister := informerFactory.Core().V1().ConfigMaps().Lister()
返回的 Lister 是针对具体资源类型的,使用时不需要转换类型,用起来会比 Indexer 方便不少,比如说 ConfigMapLister:
// ConfigMapLister helps list ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapLister interface {
// List lists all ConfigMaps in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
// ConfigMaps returns an object that can list and get ConfigMaps.
ConfigMaps(namespace string) ConfigMapNamespaceLister
}
// ConfigMapNamespaceLister helps list and get ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapNamespaceLister interface {
// List lists all ConfigMaps in the indexer for a given namespace.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
// Get retrieves the ConfigMap from the indexer for a given namespace and name.
// Objects returned here must be treated as read-only.
Get(name string) (*v1.ConfigMap, error)
}
在所有针对特定资源类型的方法外,SharedInformerFactory 还暴露以下 4 个方法:
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
InformerFor 用于获取指定资源类型的 Informer。注意!通过此方法构建的 Informer 也会被缓存,下次对相同资源类型的调用就不会调用 newFunc,因此会影响使用默认方法获取的 Informer,大多数情况下只在 informers 包内部调用,还是以 ConfigMap 举例:
func (f *configMapInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.ConfigMap{}, f.defaultInformer)
}
ForResource 用于根据 schema.GroupVersionResource 动态构建 Informer,但它返回的 Informer 和 Lister 是通用类型,使用起来没有具体类型那么方便;并且从实现上来看,就是一个 switch 语句,实际还是根据类型生成了具体资源类型的 Informer 和 Lister,并没有对第三方 CRD 的支持,估计是本来想实现 dynamic 包的功能,但是完成度不高,存在意义暂时不明 🕵️♂️
剩下就是 Start 和 WaitForCacheSync,这两个方法的作用想必你也能猜到,就是统一启动所有 Informers 和等待所有 Informers 首次同步完成。
Informer 最佳实践
上面关于 SharedInformerFactory 的演示代码能够正常运行,但是还存在一些问题:
(1). 处理事件是同步操作,有可能造成阻塞(在上一部分关于 cache 包的源码分析中可知,Informer 收到事件后调用处理程序是同步操作);
(2). 处理事件没有失败重试机制;
(3). 退出不够优雅,有可能造成状态异常;
(4). resyncPeriod 设置为 0,运行过程中有可能累积与服务器的不一致;
对于 (1), (2) 和 (3),都可以通过引入工作队列 (workqueue) 解决。对于 (4),kube-controller-manager (--min-resync-period) 采用的重新同步间隔是 12h - 24h(在此范围内随机,避免所有资源类型同时同步造成激增的压力),可以认为这是一个比较合理的值。
在 client-go 中内置了 workqueue 的实现,它被广泛使用于 kube-controller-manager 中,我们直接使用它来实现工作队列:
考虑上述这些情况,并确定了实现方案后,我设计了一个比较完善的控制器:树懒控制器 (SlothController) 🙉 :
树懒们会在收到更新事件时会报告到日志输出,但是每次执行任务前它们都要先睡一觉,而且有一定概率睡过头导致任务失败。如果多次睡过头导致某个任务失败,树懒们还会选择放弃这次任务。你不可以打断树懒的睡眠,在收到中断 (INTERRUPT) 信号后,需要等树懒们都处理完手头的工作才会退出(即使最终结果是失败)。
代码如下,建议从 main 函数开始阅读:
package main
import (
"flag"
"fmt"
"github.com/spongeprojects/magicconch"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"os"
"os/signal"
"sync"
"time"
)
// SlothController 树懒控制器!
type SlothController struct {
factory informers.SharedInformerFactory
configMapLister listerscorev1.ConfigMapLister
configMapInformer cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
processingItems *sync.WaitGroup
// maxRetries 树懒们需要重试多少次才会放弃
maxRetries int
// chanceOfFailure 树懒们处理任务有多少概率失败(百分比)
chanceOfFailure int
// nap 树懒睡一觉要多久
nap time.Duration
}
func NewController(
factory informers.SharedInformerFactory,
queue workqueue.RateLimitingInterface,
chanceOfFailure int,
nap time.Duration,
) *SlothController {
configMapLister := factory.Core().V1().ConfigMaps().Lister()
configMapInformer := factory.Core().V1().ConfigMaps().Informer()
configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
// 只简单地把 key 放到队列中
klog.Infof("[%s] received", key)
queue.Add(key)
}
},
})
return &SlothController{
factory: factory,
configMapLister: configMapLister,
configMapInformer: configMapInformer,
queue: queue,
maxRetries: 3,
chanceOfFailure: chanceOfFailure,
nap: nap,
processingItems: &sync.WaitGroup{},
}
}
// Run 开始运行控制器直到出错或 stopCh 关闭
func (c *SlothController) Run(sloths int, stopCh chan struct{}) error {
// runtime.HandleCrash 是 Kubernetes 官方提供的 panic recover 方法,
// 提供一个 panic recover 的统一入口,
// 默认只是记录日志,该 panic 还是 panic。
defer runtime.HandleCrash()
// 退出前关闭队列让树懒们不要再接手新任务
defer c.queue.ShutDown()
klog.Info("SlothController starting...")
go c.factory.Start(stopCh)
// 等待首次同步完成
for _, ok := range c.factory.WaitForCacheSync(stopCh) {
if !ok {
return fmt.Errorf("timed out waiting for caches to sync")
}
}
for i := 0; i < sloths; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
// 等待 stopCh 关闭
<-stopCh
// 等待正在执行中的任务完成
klog.Info("waiting for processing items to finish...")
c.processingItems.Wait()
return nil
}
func (c *SlothController) runWorker() {
for c.processNextItem() {
}
}
// processNextItem 用于等待和处理队列中的新任务
func (c *SlothController) processNextItem() bool {
// 阻塞住,等待新任务中...
key, shutdown := c.queue.Get()
if shutdown {
return false // 队列已进入退出状态,不要继续处理
}
c.processingItems.Add(1)
// 任务完成后,无论成功与否,都记得标记完成,因为尽管有多只树懒,
// 但对相同 key 的多个任务是不会并行处理的,
// 如果相同 key 有多个事件,不要阻塞处理。
defer c.queue.Done(key)
result := c.processItem(key)
c.handleErr(key, result)
c.processingItems.Done()
return true
}
// processItem 用于同步处理一个任务
func (c *SlothController) processItem(key interface{}) error {
// 处理任务很慢,因为树懒很懒
time.Sleep(c.nap)
if rand.Intn(100) < c.chanceOfFailure {
// 睡过头啦!
return fmt.Errorf("zzz... ")
}
klog.Infof("[%s] processed.", key)
return nil
}
// handleErr 用于检查任务处理结果,在必要的时候重试
func (c *SlothController) handleErr(key interface{}, result error) {
if result == nil {
// 每次执行成功后清空重试记录。
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < c.maxRetries {
klog.Warningf("error processing %s: %v", key, result)
// 执行失败,重试
c.queue.AddRateLimited(key)
return
}
// 重试次数过多,日志记录错误,同时也别忘了清空重试记录
c.queue.Forget(key)
// runtime.HandleError 是 Kubernetes 官方提供的错误响应方法,
// 提供一个错误响应的统一入口。
runtime.HandleError(fmt.Errorf("max retries exceeded, "+
"dropping item %s out of the queue: %v", key, result))
}
func main() {
fmt.Println("----- 10-sloth-controller -----")
var sloths int
var chanceOfFailure int
var napInSecond int
flag.IntVar(&sloths, "sloths", 1,
"number of sloths")
flag.IntVar(&chanceOfFailure, "chance-of-failure", 0,
"chance of failure in percentage")
flag.IntVar(&napInSecond, "nap", 0,
"how long should the sloth nap (in seconds)")
flag.Parse()
kubeconfig := os.Getenv("KUBECONFIG")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
magicconch.Must(err)
clientset, err := kubernetes.NewForConfig(config)
magicconch.Must(err)
// 创建 SharedInformerFactory
defaultResyncPeriod := time.Hour * 12
informerFactory := informers.NewSharedInformerFactoryWithOptions(
clientset, defaultResyncPeriod, informers.WithNamespace("tmp"))
// 使用默认配置创建 RateLimitingQueue,这种队列支持重试,同时会记录重试次数
rateLimiter := workqueue.DefaultControllerRateLimiter()
queue := workqueue.NewRateLimitingQueue(rateLimiter)
controller := NewController(informerFactory, queue, chanceOfFailure,
time.Duration(napInSecond)*time.Second)
stopCh := make(chan struct{})
// 响应中断信号 (Ctrl+C)
interrupted := make(chan os.Signal)
signal.Notify(interrupted, os.Interrupt)
go func() {
<-interrupted // 第 1 次收到中断信号时关闭 stopCh
close(stopCh)
<-interrupted // 第 2 次收到中断信号时直接退出
os.Exit(1)
}()
if err := controller.Run(sloths, stopCh); err != nil {
klog.Errorf("SlothController exit with error: %s", err)
} else {
klog.Info("SlothController exit")
}
}
下面演示一下树懒控制器工作的样子,先确认 1 只树懒,不睡觉的情况下能否正常工作:
❯ sloth-controller --sloths=1 --chance-of-failure=0 --nap=0
I0508 00:45:41.142394 49248 main.go:79] SlothController starting...
I0508 00:45:46.813814 49248 main.go:52] [tmp/demo] received
I0508 00:45:46.813852 49248 main.go:141] [tmp/demo] processed.
I0508 00:45:47.333631 49248 main.go:52] [tmp/demo1] received
I0508 00:45:47.333674 49248 main.go:141] [tmp/demo1] processed.
可见在不睡觉的情况下处理任务还是很快的,只用了微秒级的时间。
接下来给树懒们加入一些嗜睡基因 (--nap=5):
❯ sloth-controller --sloths=1 --chance-of-failure=0 --nap=5
I0508 00:49:49.672573 49494 main.go:79] SlothController starting...
I0508 00:50:02.637114 49494 main.go:52] [tmp/demo] received
I0508 00:50:03.123127 49494 main.go:52] [tmp/demo1] received
I0508 00:50:07.637208 49494 main.go:141] [tmp/demo] processed.
I0508 00:50:12.641724 49494 main.go:141] [tmp/demo1] processed.
注意到处理任务的时间变长了,而且任务是排队处理的,总共花费了 10 秒多时间,因为只有 1 只树懒。
加入更多的树懒 (--sloths=3):
❯ sloth-controller --sloths=3 --chance-of-failure=0 --nap=5
I0508 00:51:18.519972 49662 main.go:79] SlothController starting...
I0508 00:51:22.299195 49662 main.go:52] [tmp/demo] received
I0508 00:51:22.827831 49662 main.go:52] [tmp/demo1] received
I0508 00:51:27.302323 49662 main.go:141] [tmp/demo] processed.
I0508 00:51:27.827984 49662 main.go:141] [tmp/demo1] processed.
现在任务处理效率变高了,实现了并行处理任务的功能。
我们尝试打断一下树懒控制器的运行:
❯ sloth-controller --sloths=3 --chance-of-failure=0 --nap=5
I0508 00:52:39.233788 49771 main.go:79] SlothController starting...
I0508 00:52:47.551730 49771 main.go:52] [tmp/demo] received
I0508 00:52:48.184259 49771 main.go:52] [tmp/demo1] received
^C
I0508 00:52:48.817727 49771 main.go:98] waiting for processing items to finish...
I0508 00:52:52.556681 49771 main.go:141] [tmp/demo] processed.
I0508 00:52:53.188094 49771 main.go:141] [tmp/demo1] processed.
I0508 00:52:53.188194 49771 main.go:217] SlothController exit
注意到,在发出中断指令后,程序等到两个进行中的任务都完成后才退出,实现了优雅退出的功能。
接下来给树懒们加入一些更加强效的嗜睡基因,让他们有可能无法处理任务 (--chance-of-failure=50):
❯ sloth-controller --sloths=3 --chance-of-failure=50 --nap=5
I0508 00:58:25.991831 50040 main.go:79] SlothController starting...
I0508 00:58:29.627866 50040 main.go:52] [tmp/demo] received
W0508 00:58:34.630694 50040 main.go:154] error processing tmp/demo: zzz...
I0508 00:58:39.637953 50040 main.go:141] [tmp/demo] processed.
表现和预期的一样,出现了失败的任务,并且失败的任务进行了重试,最终执行成功,只不过花费了双倍的时间…
最后我们给树懒们加入一些极度强效的嗜睡基因 (--chance-of-failure=99) 😰
❯ sloth-controller --sloths=3 --chance-of-failure=99 --nap=5
I0508 01:00:58.172928 50221 main.go:79] SlothController starting...
I0508 01:01:12.565151 50221 main.go:52] [tmp/demo] received
W0508 01:01:17.565633 50221 main.go:154] error processing tmp/demo: zzz...
W0508 01:01:22.574243 50221 main.go:154] error processing tmp/demo: zzz...
W0508 01:01:27.588450 50221 main.go:154] error processing tmp/demo: zzz...
E0508 01:01:32.613327 50221 main.go:164] max retries exceeded, dropping item tmp/demo out of the queue: zzz...
没有办法,在经过 3 次尝试并失败后,任务被放弃不再重试,但没有影响到控制器整体运行。
通过以上的演示,可以看到树懒控制器完全实现了前面提到的功能,是一个非常可靠的控制器(当然前提是树懒不睡觉)。
👩⚕️ 实验过程中没有任何动物受到伤害。
总结
在以上的内容中,我们分析了 informers 包的源代码,并演示了 Informer 在实际使用中的最佳实践。
接下来,在后续章节中,我还是带大家继续解析 Informer 相关源代码,同时介绍自定义资源 (CRD) Informer 的实现和动态 Informer 相关的内容。
下一部分:实现自定义资源 (CRD) Informer。
参考资料
Bitnami Engineering: A deep dive into Kubernetes controllers
Bitnami Engineering: Kubewatch, an example of Kubernetes custom controller
Dynamic Kubernetes Informers | FireHydrant
client-go/main.go at master · kubernetes/client-go · GitHub