编写一个kubernetescontroller

博客 动态
0 124
优雅殿下
优雅殿下 2022-06-19 18:00:44
悬赏:0 积分 收藏

编写一个kubernetes controller

Overview

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers

package mainimport (	"flag"	"fmt"	"os"	"time"	v1 "k8s.io/api/core/v1"	"k8s.io/apimachinery/pkg/fields"	utilruntime "k8s.io/apimachinery/pkg/util/runtime"	"k8s.io/apimachinery/pkg/util/wait"	"k8s.io/client-go/kubernetes"	"k8s.io/client-go/rest"	"k8s.io/client-go/tools/cache"	"k8s.io/client-go/tools/clientcmd"	"k8s.io/client-go/util/homedir"	"k8s.io/client-go/util/workqueue"	"k8s.io/klog")type Controller struct {	lister     cache.Indexer	controller cache.Controller	queue      workqueue.RateLimitingInterface}func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {	return &Controller{		lister:     lister,		controller: controller,		queue:      queue,	}}func (c *Controller) processItem() bool {	item, quit := c.queue.Get()	if quit {		return false	}	defer c.queue.Done(item)	fmt.Println(item)	err := c.processWrapper(item.(string))	if err != nil {		c.handleError(item.(string))	}	return true}func (c *Controller) handleError(key string) {	if c.queue.NumRequeues(key) < 3 {		c.queue.AddRateLimited(key)		return	}	c.queue.Forget(key)	klog.Infof("Drop Object %s in queue", key)}func (c *Controller) processWrapper(key string) error {	item, exists, err := c.lister.GetByKey(key)	if err != nil {		klog.Error(err)		return err	}	if !exists {		klog.Info(fmt.Sprintf("item %v not exists in cache.\n", item))	} else {		fmt.Println(item.(*v1.Pod).GetName())	}	return err}func (c *Controller) Run(threadiness int, stopCh chan struct{}) {	defer utilruntime.HandleCrash()	defer c.queue.ShutDown()	klog.Infof("Starting custom controller")	go c.controller.Run(stopCh)	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {		utilruntime.HandleError(fmt.Errorf("sync failed."))		return	}	for i := 0; i < threadiness; i++ {		go wait.Until(func() {			for c.processItem() {			}		}, time.Second, stopCh)	}	<-stopCh	klog.Info("Stopping custom controller")}func main() {	var (		k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证		restConfig *rest.Config		err        error	)	if home := homedir.HomeDir(); home != "" {		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")	}	k8sconfig = k8sconfig	flag.Parse()	if _, err := os.Stat(*k8sconfig); err != nil {		panic(err)	}	if restConfig, err = rest.InClusterConfig(); err != nil {		// 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)		if err != nil {			panic(err)		}	}	restset, err := kubernetes.NewForConfig(restConfig)	lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything())	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())	indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{		AddFunc: func(obj interface{}) {			fmt.Println("add ", obj.(*v1.Pod).GetName())			key, err := cache.MetaNamespaceKeyFunc(obj)			if err == nil {				queue.Add(key)			}		},		UpdateFunc: func(oldObj, newObj interface{}) {			fmt.Println("update", newObj.(*v1.Pod).GetName())			if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" {				fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status)			} else {				fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status)				fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason)			}			if len(newObj.(*v1.Pod).Status.Conditions) > 1 {				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)				} else {					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)				}				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)				} else {					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)				}				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)				} else {					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)				}			}		},		DeleteFunc: func(obj interface{}) {			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)			// 上面是事件函数的处理,下面是对workqueue的操作			key, err := cache.MetaNamespaceKeyFunc(obj)			if err == nil {				queue.Add(key)			}		},	}, cache.Indexers{})	c := NewController(indexer, controller, queue)	stopCh := make(chan struct{})	stopCh1 := make(chan struct{})	c.Run(1, stopCh)	defer close(stopCh)	<-stopCh1}

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
add  netboxdefault/netboxnetboxupdate netbox status Pending to Pendingupdate: the Initialized Status Trueupdate netbox status Pending to Pendingupdate: the Initialized Status Trueupdate: the Ready Status  Falseupdate: the Ready Reason  ContainersNotReadyupdate: the PodCondition Status  Falseupdate: the PodCondition Reason  ContainersNotReadyupdate: the PodScheduled Status Trueupdate netbox status Pending to Runningupdate: the Initialized Status Trueupdate: the Ready Status Trueupdate: the PodCondition Status Trueupdate: the PodScheduled Status True

大致上与 kubectl describe pod 看到的内容页相似

default-scheduler  Successfully assigned default/netbox to master-machine  Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"  Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"  Normal  Created    30s   kubelet            Created container netbox  Normal  Started    30s   kubelet            Started container netbox

Reference

controllers.md

posted @ 2022-06-19 17:45 Cylon 阅读(0) 评论(0) 编辑 收藏 举报
回帖
    优雅殿下

    优雅殿下 (王者 段位)

    2018 积分 (2)粉丝 (47)源码

    小小码农,大大世界

     

    温馨提示

    亦奇源码

    最新会员