03 May 2021 05:21 +0000

本文分 4 部分,分别是:

(1). cache 包源码解析与 Informer 的使用;

(2). informers 包源码解析与 SharedInformerFactory 的使用,以及 Informer 在实际使用中的最佳实践;

(3). 实现自定义资源 (CRD) Informer;

(4). dynamic 包源码解析与 DynamicSharedInformerFactory 的使用;

这里是第 1 部分。

本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。

控制器 (controller)

如果你对 Kubernetes 的架构有一定的了解,那么你就会知道,Kubernetes 通过控制器 (controller) 来维护集群的状态。

比如 ReplicaSetController 负责维护 ReplicaSet 对应的 Pod 数量,无论是 Pod 崩溃,被删除,还是 ReplicaSet 的 Replicas 配置发生变更,ReplicaSetController 都会采取必要的操作以维持各个 ReplicaSet 的 Selector 对应的 Pod 数量与 Replicas 一致。

除了 ReplicaSetController 外,Kubernetes 还提供了很多内置的控制器,包括 EndpointController, NamespaceController, ServiceAccountController 等等,为了方便管理,这些内置的控制器都打包在一个二进制文件当中,也就是我们熟悉的 Kubernetes 核心组件之一: kube-controller-manager

除了内置控制器,扩展的控制器也应用广泛,比如 Traefik Kubernetes provider, cert-manager controller

当然我们也可以开发自己的控制器实现自己的控制逻辑。

要深入理解 Kubernetes,除了熟悉已有控制器的角色与控制逻辑外,了解控制器的工作原理和实现也必不可少,今天我们就来看看,控制器到底是如何工作的 🕵️‍♂️

控制器的工作原理:Kubernetes API

Kubernetes 控制面 (control plane) 的核心是 API 服务器 (API server)。API 服务器负责提供 HTTP API,以供用户,集群中的不同部分和集群外部组件相互通信。控制器也不例外,所有控制器都通过 API 获取集群的当前状态,也通过 API 对集群状态进行修改。

值得一提的是,Kubernetes 提供了 watch 机制方便客户端实时获取集群状态,有了这个接口,控制器才得以无延迟(准确地说是低延迟)地对状态变更作出响应。这里指的 "状态变更",就是我们常说的事件 (event)

Watch 接口演示:

Error (9)

控制器的实现:Informer

有了接口,我们就足以对集群状态进行观察和控制,但是,控制回路是一个无限循环的过程,每一次向服务器发送的请求,和每一个与服务器保持的连接都会给客户端和服务器端造成额外的资源开销,必须引入缓存机制,减少不必要的资源开销。Informer 就是 Kubernetes 开发者们设计的一套缓存机制。

Kubernetes Informer 相关的代码包含在 kubernetes/client-go 仓库中,主要的包有 cache, informersdynamic,其中 informers 和 dynamic 包主要实现 Informer 的使用,Informer 本身逻辑主要聚合在 cache 包中,那我们就来研究一下这几个包。

如开头所说,本文分为 4 个部分,以下是第 1 部分: cache 包源码解析与 Informer 的使用。

cache 包源码解析

顶层设计 ☝️

单词 "inform" 的含义是 "通知,告知",Informer 实际上就是实现了在状态发生变更时通知到处理程序 (handler) 的类,同时需要包含缓存机制,以减少额外的资源开销。

为了实现上述逻辑,cache 包设计了以下几个核心接口或类:

  • ListerWatcher
  • Store
  • Queue
  • Reflector
  • Indexer
  • Controller (Informer)
  • SharedInformer 和 SharedIndexInformer

简单概括起来:

(a). ListerWatcher 是对原始接口的封装,包括 List 和 Watch 两个方法,是数据的源头。

(b). Store 是缓存用的对象存储,包括对对象进行增删改查的若干方法,像 Add, Update, Delete, List, Get 等等,Store 有 Queue 和 Indexer 两个扩展接口,这两个接口的作用完全不同,所以我们后面再分别看这两个接口。

(c). Queue 扩展自 Store,在 Store 的基础上增加了 Pop 等几个方法,顾名思义是用于实现先进先出 (FIFO) 队列用的。在 cache 包中 Queue 被设计用于缓存变更 (Delta),变更就是引发状态改变的事件 (Event)。通过 Queue 将事件产生和事件处理解耦。

(d). Reflector 是连接上述 ListerWatcher 和 Queue 的桥梁,将原始数据转换为统一的队列,方便控制器处理。

(e). Indexer 也扩展自 Store,在 Store 的基础上增加了多索引设计,即允许对对象采用多种方式建立索引。在 cache 包中 Store 的作用之一是用于保存当前的资源状态,Indexer 继承了 Store 的这部分功能,在重新同步 (resync) 过程中起重要作用。

(f). Controller 接口是对某一组控制逻辑的封装,最主要的一个方法是 Run(stopCh <-chan struct{}),一般设计为无限的循环,不断消费队列,直到 stopCh 被关闭。

(g). Informer 指的是一类专门用于实现 Informer 机制的 Controller。

(h). SharedInformer 是对 Controller 的再次封装,目的是实现多个处理程序对同一个资源的事件响应,从 AddEventHandler 这个方法就可以大致明白 SharedInformer 和 Informer 的区别。

(i). SharedIndexInformer 和 SharedInformer 作用是一样的,唯一区别在于 SharedIndexInformer 使用了 Indexer。

Error (9)

client-go

看接下来的内容前,你需要知道如何基本使用 client-go,所以请先熟悉以下代码后再继续阅读:

package main

import (
	"context"
	"fmt"
	"github.com/spongeprojects/magicconch"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	"os"
)

func mustClientset() kubernetes.Interface {
	kubeconfig := os.Getenv("KUBECONFIG")

	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	magicconch.Must(err)

	clientset, err := kubernetes.NewForConfig(config)
	magicconch.Must(err)

	return clientset
}

func main() {
	clientset := mustClientset()

	namespaces, err := clientset.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
	magicconch.Must(err)

	for _, namespace := range namespaces.Items {
		fmt.Println(namespace.Name)
	}
}

对以上代码的说明不在本文范畴内 🌚 🙅‍♂️

在接下来的代码中,我们把 mustClientset 部分代码移到单独的文件中复用。

本文使用的所有代码都可以在 articles/archive/dive-into-kubernetes-informer at main · wbsnail/articles · GitHub 找到。

ListerWatcher

根据前面分析,ListerWatcher 是数据的源头,我们先看这个接口。

// Lister is any object that knows how to perform an initial list.
type Lister interface {
	// List should return a list type object; the Items field will be extracted, and the
	// ResourceVersion field will be used to start the watch in the right place.
	List(options metav1.ListOptions) (runtime.Object, error)
}

// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
	// Watch should begin a watch at the specified version.
	Watch(options metav1.ListOptions) (watch.Interface, error)
}

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
	Lister
	Watcher
}

可以看到 ListerWatcher 接口包含 List 和 Watch 两个方法,这两个方法都是与资源类型无关的,操作的都是 runtime.Object 对象,在使用的时候一般需要进行反射或转换为具体的类型。我们使用 cache 包暴露的 NewListWatchFromClient 方法创建一个实例,看看它的使用方法:

package main

import (
	"fmt"
	"github.com/spongeprojects/magicconch"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/meta"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/client-go/tools/cache"
	"os"
	"os/signal"
)

// newConfigMapsListerWatcher 用于创建 tmp namespace 下 configmaps 资源的 ListerWatcher 实例
func newConfigMapsListerWatcher() cache.ListerWatcher {
	clientset := mustClientset()              // 前面有说明
	client := clientset.CoreV1().RESTClient() // 客户端,请求器
	resource := "configmaps"                  // GET 请求参数之一
	namespace := "tmp"                        // GET 请求参数之一
	selector := fields.Everything()           // GET 请求参数之一
	lw := cache.NewListWatchFromClient(client, resource, namespace, selector)
	return lw
}

func main() {
	fmt.Println("----- 1-list-watcher -----")

	lw := newConfigMapsListerWatcher()

	// list 的类型为 runtime.Object, 需要经过反射或类型转换才能使用,
	// 传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector
	list, err := lw.List(metav1.ListOptions{})
	magicconch.Must(err)

	// meta 包封装了一些处理 runtime.Object 对象的方法,屏蔽了反射和类型转换的过程,
	// 提取出的 items 类型为 []runtime.Object
	items, err := meta.ExtractList(list)
	magicconch.Must(err)

	fmt.Println("Initial list:")

	for _, item := range items {
		configMap, ok := item.(*corev1.ConfigMap)
		if !ok {
			return
		}
		fmt.Println(configMap.Name)

		// 如果只关注 meta 信息,也可以不进行类型转换,而是使用 meta.Accessor 方法
		// accessor, err := meta.Accessor(item)
		// magicconch.Must(err)
		// fmt.Println(accessor.GetName())
	}

	listMetaInterface, err := meta.ListAccessor(list)
	magicconch.Must(err)

	// resourceVersion 在同步过程中非常重要,看下面它在 Watch 接口中的使用
	resourceVersion := listMetaInterface.GetResourceVersion()

	// w 的类型为 watch.Interface,提供 ResultChan 方法读取事件,
	// 和 List 一样,传入的 ListOptions 中的 FieldSelector 始终会被替换为前面的 selector,
	// ResourceVersion 是 Watch 时非常重要的参数,
	// 它代表一次客户端与服务器进行交互时对应的资源版本,
	// 结合另一个参数 ResourceVersionMatch,表示本次请求对 ResourceVersion 的筛选,
	// 比如以下请求表示:获取版本新于 resourceVersion 的事件。
	// 在考虑连接中断和定期重新同步 (resync) 的情况下,
	// 对 ResourceVersion 的管理就变得更为复杂,我们先不考虑这些情况。
	w, err := lw.Watch(metav1.ListOptions{
		ResourceVersion: resourceVersion,
	})
	magicconch.Must(err)

	stopCh := make(chan os.Signal)
	signal.Notify(stopCh, os.Interrupt)

	fmt.Println("Start watching...")

loop:
	for {
		select {
		case <-stopCh:
			fmt.Println("Interrupted")
			break loop
		case event, ok := <-w.ResultChan():
			if !ok {
				fmt.Println("Broken channel")
				break loop
			}
			configMap, ok := event.Object.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("%s: %s\n", event.Type, configMap.Name)
		}
	}
}

对 ResourceVersion 的详细说明可以参考官方文档:Kubernetes API Concepts | Kubernetes

输出类似于:

----- 1-list-watcher -----
Initial list:
demo
demo1
Start watching...
DELETED: demo
ADDED: demo

以上代码的说明都在注释中,只说明两点:(1). 以上对事件响应是同步的,如果执行复杂的操作会引起阻塞,需要引入队列; (2). 以上代码缺少重新连接和重新同步机制,有可能出现数据不一致问题。

在接下来的代码中,我们把 newConfigMapsListerWatcher 部分代码移到单独的文件中复用。

Reflector

Reflector 被设计来解决上面提到的两个问题:

package main

import (
	"fmt"
	"github.com/spongeprojects/magicconch"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
	"time"
)

// newStore 用于创建一个 cache.Store 对象,作为当前资源状态的对象存储
func newStore() cache.Store {
	return cache.NewStore(cache.MetaNamespaceKeyFunc)
}

// newQueue 用于创建一个 cache.Queue 对象,这里实现为 FIFO 先进先出队列,
// 注意在初始化时 store 作为 KnownObjects 参数传入其中,
// 因为在重新同步 (resync) 操作中 Reflector 需要知道当前的资源状态,
// 另外在计算变更 (Delta) 时,也需要对比当前的资源状态。
// 这个 KnownObjects 对队列,以及对 Reflector 都是只读的,用户需要自己维护好 store 的状态。
func newQueue(store cache.Store) cache.Queue {
	return cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
		KnownObjects:          store,
		EmitDeltaTypeReplaced: true,
	})
}

// newConfigMapsReflector 用于创建一个 cache.Reflector 对象,
// 当 Reflector 开始运行 (Run) 后,队列中就会推入新收到的事件。
func newConfigMapsReflector(queue cache.Queue) *cache.Reflector {
	lw := newConfigMapsListerWatcher() // 前面有说明
	// 第 2 个参数是 expectedType, 用此参数限制进入队列的事件,
	// 当然在 List 和 Watch 操作时返回的数据就只有一种类型,这个参数只起校验的作用;
	// 第 4 个参数是 resyncPeriod,
	// 这里传了 0,表示从不重新同步(除非连接超时或者中断),
	// 如果传了非 0 值,会定期进行全量同步,避免累积和服务器的不一致,
	// 重新同步过程中会产生 SYNC 类型的事件。
	return cache.NewReflector(lw, &corev1.ConfigMap{}, queue, 0)
}

func main() {
	fmt.Println("----- 2-reflector -----")

	store := newStore()
	queue := newQueue(store)
	reflector := newConfigMapsReflector(queue)

	stopCh := make(chan struct{})
	defer close(stopCh)

	// reflector 开始运行后,队列中就会推入新收到的事件
	go reflector.Run(stopCh)

	// 注意处理事件过程中维护好 store 状态,包括 Add, Update, Delete 操作,
	// 否则会出现不同步问题,在 Informer 当中这些逻辑都已经被封装好了,但目前我们还需要关心一下。
	processObj := func(obj interface{}) error {
		// 最先收到的事件会被最先处理
		for _, d := range obj.(cache.Deltas) {
			switch d.Type {
			case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
				if _, exists, err := store.Get(d.Object); err == nil && exists {
					if err := store.Update(d.Object); err != nil {
						return err
					}
				} else {
					if err := store.Add(d.Object); err != nil {
						return err
					}
				}
			case cache.Deleted:
				if err := store.Delete(d.Object); err != nil {
					return err
				}
			}
			configMap, ok := d.Object.(*corev1.ConfigMap)
			if !ok {
				return fmt.Errorf("not config: %T", d.Object)
			}
			fmt.Printf("%s: %s\n", d.Type, configMap.Name)
		}
		return nil
	}

	fmt.Println("Start syncing...")

	// 持续运行直到 stopCh 关闭
	wait.Until(func() {
		for {
			_, err := queue.Pop(processObj)
			magicconch.Must(err)
		}
	}, time.Second, stopCh)
}

输出类似于:

----- 2-reflector -----
Start syncing...
Replaced: demo1
Replaced: demo
Deleted: demo
Added: demo

在以上代码中事件接收后被缓存到队列中,避免了阻塞问题,但对队列的消费仍然是同步的,需要再实现多 worker 才能提高效率,这一部分实现不包含在 cache 包中,在本文的第 2 部分:Informer 在实际使用中的最佳实践中有涉及。

Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多,感兴趣可以深入阅读源码的实现细节。

在接下来的代码中,我们把 newStore 和 newQueue 部分代码移到单独的文件中复用。

Controller

Controller 并没有什么特殊的作用,只是对上面一段代码的逻辑进行封装:

package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func newController() cache.Controller {
	fmt.Println("----- 3-controller -----")

	lw := newConfigMapsListerWatcher()
	store := newStore()
	queue := newQueue(store)
	cfg := &cache.Config{
		Queue:            queue,
		ListerWatcher:    lw,
		ObjectType:       &corev1.ConfigMap{},
		FullResyncPeriod: 0,
		RetryOnError:     false,
		Process: func(obj interface{}) error {
			for _, d := range obj.(cache.Deltas) {
				switch d.Type {
				case cache.Sync, cache.Replaced, cache.Added, cache.Updated:
					if _, exists, err := store.Get(d.Object); err == nil && exists {
						if err := store.Update(d.Object); err != nil {
							return err
						}
					} else {
						if err := store.Add(d.Object); err != nil {
							return err
						}
					}
				case cache.Deleted:
					if err := store.Delete(d.Object); err != nil {
						return err
					}
				}
				configMap, ok := d.Object.(*corev1.ConfigMap)
				if !ok {
					return fmt.Errorf("not config: %T", d.Object)
				}
				fmt.Printf("%s: %s\n", d.Type, configMap.Name)
			}
			return nil
		},
	}
	return cache.New(cfg)
}

func main() {
	controller := newController()

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go controller.Run(stopCh)

	<-stopCh
}

输出和前一段 Reflector 演示代码相同,代码也有诸多相同之处,区别在于将 Reflector 封装进了 Controller 当中,只暴露 Run 接口(实际上 Reflector 对象在 Run 过程中被创建)。

Informer

到上面的 Controller 为止,我们都是在直接处理事件,需要判断事件类型,还要根据事件类型维护客户端缓存的状态,Informer 就是封装了这些逻辑的 Controller(没错,Informer 就是 Controller)。

package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {
	fmt.Println("----- 4-informer -----")

	lw := newConfigMapsListerWatcher()
	// 第一个返回的参数是 cache.Store,这里暂时用不到所以直接丢弃
	_, controller := cache.NewInformer(lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("created: %s\n", configMap.Name)
		},
		UpdateFunc: func(old, new interface{}) {
			configMap, ok := old.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("updated: %s\n", configMap.Name)
		},
		DeleteFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("deleted: %s\n", configMap.Name)
		},
	})

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go controller.Run(stopCh)

	<-stopCh
}

输出类似于:

----- 4-informer -----
Start syncing....
created: demo
created: demo1
deleted: demo
created: demo

可以看到很重大的一个变化是我们收到的 obj 由 cache.Deltas 变成了 *corev1.ConfigMap,Informer 帮我们封装了转换的逻辑。如果你跟进 NewInformer 方法,你就可以看到前面 Controller 演示中的 newController 那一段类似的代码。

有了 Informer 我们的逻辑一下子就变得清晰起来,只需要关心处理程序 (handler) 就可以了。

有一个细节需要注意,如果设置了非 0 的 resyncPeriod (NewInformer 的第 3 个参数), 那么在定期同步过程中会产生 SYNC 事件,最终触发一连串的 UpdateFunc 调用。

UpdateFunc 被触发并不代表资源发生了改变,有可能资源状态完全没有变化。

SharedInformer

在上面的 Informer 设计当中,处理程序作为一个参数传入 NewInformer,如果有另一个处理程序需要处理相同资源,需要另外创建一个 Informer 对象,而队列是不能复用的,因为队列不支持两个消费者同时消费,为了解决这个问题,cache 包中又设计了 SharedInformer,顾名思义,就是多个处理程序共享的 Informer:

package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {
	fmt.Println("----- 5-shared-informer -----")

	lw := newConfigMapsListerWatcher()
	sharedInformer := cache.NewSharedInformer(lw, &corev1.ConfigMap{}, 0)
	// 添加一个处理程序
	sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("created, printing namespace: %s\n", configMap.Namespace)
		},
	})
	// 添加另一个处理程序
	sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if !ok {
				return
			}
			fmt.Printf("created, printing name: %s\n", configMap.Name)
		},
	})

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go sharedInformer.Run(stopCh)

	<-stopCh
}

输出类似于:

----- 5-shared-informer -----
Start syncing....
created, printing namespace: tmp
created, printing namespace: tmp
created, printing name: demo
created, printing name: demo1

如上所示,所有事件会被广播给每一个注册的处理程序。

IndexInformer 和 SharedIndexInformer

在前面使用的 Reflector, Informer 和 SharedInformer 中,在实现事件响应的同时,我们还维护了一份当前最新的资源状态,就是代码中用到的 Store。在 Reflector 演示代码中 Store 由我们自己构建并传入 NewReflector,在 Informer 演示代码中 Store 对象由 NewInformer 返回,在 SharedInformer 演示代码中 Store 对象也可以通过 GetStore 方法获取。

这份最新的资源状态是非常有用的,在很多控制逻辑中,除了事件本身涉及的资源外,还需要关心其他资源的状态,就需要用到 Store 对象。

但是,Store 对象只实现了基本的增删改查功能,就 "查" 而言,只实现了 Get, GetByKey, List, ListKeys 4 个方法,对象的 "key" 是用构建 Store 时指定的 KeyFunc 计算得出的,绝大多数情况下使用的都是 cache.MetaNamespaceKeyFunc 这个方法:

// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	meta, err := meta.Accessor(obj)
	if err != nil {
		return "", fmt.Errorf("object has no meta: %v", err)
	}
	if len(meta.GetNamespace()) > 0 {
		return meta.GetNamespace() + "/" + meta.GetName(), nil
	}
	return meta.GetName(), nil
}

这意味着我们可以 (1). 根据 namespace 加 name 查找某一个资源; (2). 列出所有资源。那么问题来了,如果我想 (1). 根据 MetaNamespace 之外的某个字段查找资源,比如 label, annotation, status 等等; (2). 根据条件列出部分资源,比如某个 namespace 下的资源,就没有办法通过 Store 做到,只能用 ListerWatcher 直接调用接口,没有本地缓存。

Indexer 就是设计来解决这个问题,功能如其名,就是用来建立索引:

package main

import (
	"fmt"
	"github.com/spongeprojects/magicconch"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {
	fmt.Println("----- 6-indexer -----")

	lw := newConfigMapsListerWatcher()
	indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
	// 仅演示用,只关心 indexer,不处理事件,所以传一个空的 HandlerFunc,
	// 实际使用中一般不会这样做
	indexer, informer := cache.NewIndexerInformer(
		lw, &corev1.ConfigMap{}, 0, cache.ResourceEventHandlerFuncs{}, indexers)

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go informer.Run(stopCh)

	// 在 informer 首次同步完成后再操作
	if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
		panic("timed out waiting for caches to sync")
	}

	// 获取 cache.NamespaceIndex 索引下,索引值为 "tmp" 中的所有键
	keys, err := indexer.IndexKeys(cache.NamespaceIndex, "tmp")
	magicconch.Must(err)
	for _, k := range keys {
		fmt.Println(k)
	}
}

输出类似于:

----- 6-indexer -----
Start syncing....
tmp/demo
tmp/demo1

上面用到的 cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} 是 informers 包中默认的索引,通过这个索引可以根据 namespace 列出资源:

// MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
	meta, err := meta.Accessor(obj)
	if err != nil {
		return []string{""}, fmt.Errorf("object has no meta: %v", err)
	}
	return []string{meta.GetNamespace()}, nil
}

这个 IndexFunc 为什么是返回 []string 呢?根据 namespace 查找资源,不是只需要 namespace string 就够了吗?为什么不是返回 string 呢? 因为 Indexer 支持的索引比较复杂,除了对资源支持多种方式索引外,每一种索引方式还允许多路查找,用伪代码演示如下:

索引规则 IndexFunc 定义为:

func (obj) []string

比如上面提到的:

func MetaNamespaceIndexFunc(obj) { return []string{obj.Namespace} }

为了演示多路查找,再定义一个 IndexFunc,根据多个关键词进行索引:

func MetaLabelKeywordIndexFunc(obj) { return strings.Split(obj.Labels["keyword"], ",") }

索引器 Indexer 的定义是: map[string]IndexFunc

比如我们使用以下 Indexer:

{
    "namespace": MetaNamespaceIndexFunc,
    "keyword": MetaLabelKeywordIndexFunc,
}

现在有两个资源对象 obj:

ConfigMap{Namespace: "tmp", Name: "demo", Labels: {"keyword": "demo,golang,informer"}}
ConfigMap{Namespace: "tmp1", Name: "demo1", Labels: {"keyword": "demo1,golang,informer"}}

对以上对象进行索引后形成的状态和索引分别是:

// Store:
{
     "tmp/demo": ConfigMap{Namespace: "tmp", Name: "demo", Labels: {"keyword": "demo,golang,informer"}},
     "tmp1/demo1": ConfigMap{Namespace: "tmp1", Name: "demo1", Labels: {"keyword": "demo1,golang,informer"}},
}

// Index:
{
    "namespace": {
        "tmp": [
            "tmp/demo",
        ],
        "tmp1": [
            "tmp1/demo1",
        ],
    },
    "keyword": {
        "demo": [
            "tmp/demo",
        ],
        "demo1": [
            "tmp1/demo1",
        ],
        "golang": [
            "tmp/demo",
            "tmp1/demo1",
        ],
        "informer": [
            "tmp/demo",
            "tmp1/demo1",
        ],
    }
}

这样一来我们既可以根据 namespace 查找资源(一个资源只有一个 namespace),也可以根据关键词查找资源(一个资源可能出现在多个关键词索引中)。

回到 IndexerInformer。

上面演示了 IndexerInformer 的使用,SharedIndexInformer 和 IndexerInformer 是类似的,只是在 SharedInformer 基础上集成了 Indexer,就不单独贴出演示代码了(但是在 Github 上是有的)。

总结

以上,我们自底向上,从 ListerWatcher, Reflector, Controller, Informer, SharedInformer, IndexerInformer 一路分析到了 SharedIndexInformer 的实现与使用方式。cache 包暴露的 Informer 创建方法有以下 5 个:

  • New
  • NewInformer
  • NewIndexerInformer
  • NewSharedInformer
  • NewSharedIndexInformer

在其中,NewSharedIndexInformer 是封装程度最高的一个,也是大部分情况下我们会选择使用的一个,同时也是 informers 包中 SharedInformerFactory 给所有内置资源创建 Informer 时所使用的方法。

接下来,在后续章节中,我会带大家继续解析 Informer 相关源代码,同时介绍 SharedInformerFactory 的使用,Informer 在实际应用中的最佳实践,以及实现自定义资源 (CRD) Informer 和动态 Informer 相关的内容。

下一部分:informers 包源码解析与 SharedInformerFactory 的使用,以及 Informer 在实际使用中的最佳实践

参考资料

Bitnami Engineering: A deep dive into Kubernetes controllers

Bitnami Engineering: Kubewatch, an example of Kubernetes custom controller

Dynamic Kubernetes Informers | FireHydrant

client-go/main.go at master · kubernetes/client-go · GitHub

GitHub - kubernetes/sample-controller: Repository for sample controller. Complements sample-apiserver

Kubernetes Deep Dive: Code Generation for CustomResources

How to generate client codes for Kubernetes Custom Resource Definitions (CRD) | by Roger Liang | ITNEXT


Loading comments...