自己实现一个Controller——精简型

写在最前

controller-manager作为K8S master的其中一个组件,负责众多controller的启动和终止,这些controller负责监控着k8s中各种资源,执行调谐,使他们的实际状态能不断趋近与期望状态。这些controller包括servercontroller,nodecontroller,deploymentcontroller等。对于自定义资源(CRD)也需要为之配备controller,CRD的controller也需要有controller-manager启动之,停止它。整个过程篇幅较长,故鄙人将其拆分成多篇,通过本系列,首篇将介绍如何定义一个controller-manager去启动它所管辖的controller,并实现一个最简单的controller。第二篇再实现一个较为标准的controller,并介绍informer的结构;最后一篇将介绍不借助脚手架如何实现一个CRD的controller。

介绍一下整个项目的结构

controller-demo
|---api         //用于放定义CRD各个属性的struct
    |---v1
|---client
    |---versiond
         |----scheme    //用于存放CRD的scheme
         |----typed //用于存放CRD对应的client
|---controller      //用于存放各个controller
|---informers       //用于存放informer,包含各个apiGroup各个version及一个factory
    |---ecsbind/v1  //其中一个apiGroup,其中一个version的informer,当然也是唯一一个
    |---internalinterfaces  //informer的interface接口
|---listers     //用于存放lister,包含各个apiGroup各个version
    |---ecsbind/v1  //其中一个apiGroup,其中一个version的informer,同样也是唯一一个

controller-manager

controller有两个函数,一个负责供main函数调用启动controller-manager,作为controller-manager的入口;另一个是用于启动他所管理的所有controller。

供main函数调用的Run函数定义如下

func Run(stopCh

上述函数传入一个通道,用于传递给各个controller一个终止的信号,函数里定义了一个run的函数,用于调用StartController,之所以需要定义一个run函数,是因为一般这类的组件虽然为了高可用会运行多个副本,但是仅有一个副本是真正运行,其他的副本是作为待命状态运行,而这个真正运行的副本称为leader,从普通副本中通过资源争夺称为leader的过程称为leader选举,仅有leader挂掉了,剩余的副本再进行一次leader选举成为新leader。当然也可进行leader选举模式运行。因此Run函数中应该包含是否进行leader选举,若是则执行leader选举的逻辑,当选成leader才执行run函数;如果不进行leader选举则直接执行run。不过这段逻辑被省略了。

controller-manager的另一个函数是真正启动各个controller。StartController同样接收了从Run函数传过来的通道,这个通道最终转给各个controller,传递停止的信号。在函数中会构造各个controller,通过开辟一个协程调用controller的Run方法将controller启动,代码如下所示

unc StartController(stopCh

controller.NewPodController是构造了一个PodController,构造PodController时所需要的kubeClient,informer需要预先构造。对于k8s原有的资源,其informer都可以通过SharedInformerFactory获得,通过协程执行 pc.Run(stopCh)后,也需要执行factory.Start(stopCh),factroy.Start需要等各个controller Run了之后方可执行,否则对应Controlle则会没有运行效果。

一个精简的Controller

这个controller的作用是统计集群中所有pod的数量,然后将pod的总数写到master的某个label上,且被统计过的pod都会在它的event中产生一条新记录来表明此pod被统计过。

罗列一下这个podcontroller结构的字段

type PodController struct {

    kubeClient       kubernetes.Interface    //用于给master打label
    clusterName      string
    podLister       corelisters.PodLister    //用于获取被监控的pod资源
    podListerSynced cache.InformerSynced     //用于同步cache
    broadcaster      record.EventBroadcaster //用于广播事件
    recorder         record.EventRecorder   //用于记录pod的event
}

Controller的构造函数如下

func NewPodController(kubeClient kubernetes.Interface,podInformer coreinformers.PodInformer,clusterName string)*PodController  {

    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "pod_controller"})

    rc:=&PodController{
        kubeClient:kubeClient,
        clusterName:clusterName,
        podLister:podInformer.Lister(),
        podListerSynced:podInformer.Informer().HasSynced,
        broadcaster:eventBroadcaster,
        recorder:recorder,
    }
    return rc
}

controller的各个属性中,除了broadcaster和recorder是自身构造外,其余都是通过参数传入。由于controller中需要用到事件记录,提供这一功能的是recorder,然而触发了事件需要将其散播出去给订阅者的需要一个broadcaster,这里涉及到k8s的事件机制,并不打算在细述。在构造函数中已经把事件广播绑定到glog.Infof,也就是说recorder触发了事件,会在日志中输出事件的信息。后面在Run controller的时候还会用到这个广播器。

整个Controller的启动方法如下

func (p *PodController)Run(stopCh

WaitForCacheSync用于同步指定资源的缓存,这个缓存后续会使用到,万一同步失败的话controller将不能运行

StartRecordingToSink用于把事件广播到apiserver中,如果此处不执行,即便是recorder触发了事件,apiserver没收到这个事件,最终事件信息没保存到对应pod中,我们通过kubectl describe po时就会看不到相应的event记录

通过启动一个协程去定期执行reconcilePods()方法,NonSlidingUntil函数被调用后马上执行传进去的func,后续每隔10秒就重复执行一次,直到收到stopCh通道传来的终止信号才停止。

最后通过等待接收stopCh传来的信号而阻塞当前协程,从而阻止了完成本函数的调用

reconcilePods方法的大致逻辑如前所述,经过多次调用从lister中获取所有pod,遍历每个pod把pod的命名空间和pod的名称打印出来,然后通过labelselector找出集群中的master节点将pod的数量打到名为hopegi/pod-count的label上,最后给每个pod的event事件添加一条pod count is n(这个n是pod的总数)这样的记录。

func (p *PodController)reconcilePods()error  {
    glog.Infof("reconcilePods ")
    pods,err:= p.podLister.List(labels.Everything())
    if err!=nil{
        return fmt.Errorf("error listing pods: %v", err)
    }
    return p.reconcile(pods)
}

func (p *PodController)reconcile(pods []*v1.Pod)error  {
    glog.Infof("reconcile pods")
    for _,pod :=range pods{
        fmt.Printf("pod name is %s.%s  \n",(*pod).Namespace,(*pod).Name)

    }
    nodes,err:= p.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{LabelSelector:"node-role.kubernetes.io/master"})
    if err!=nil{
        glog.Infof("get master error %v\n",err)
        return err
    }
    for _,n:=range nodes.Items{

        n.Labels["hopegi/pod-count"]=fmt.Sprintf("%d",len(pods))
        _,err= p.kubeClient.CoreV1().Nodes().Update(&n)
        if err!=nil{
            glog.Infof("label node error:%v ",err)
        }
    }
    if p.recorder!=nil {
        msg:=fmt.Sprintf("pod count is  %d",len(pods))
        for _, pod := range pods {
            p.recorder.Eventf(&v1.ObjectReference{
                Kind:"Pod",
                Name:pod.Name,
                UID:pod.UID,
                Namespace:pod.Namespace,
            },v1.EventTypeNormal,"SuccessCalculatePod",msg)
        }
    }
    return nil
}

获取集群里所有的pod用的是lister而不是通过kubeclient去获取差别在于,作为某个K8S资源的informer(本例中是pod),它内部都有一个对应资源的缓存,这个缓存在listAndWatch机制的作用下与集群中存储的pod数据保持一致,这个listAndWatch将在下一篇中介绍;后者是每访问一次都会往apiserver中发一次请求,在众多controller频繁地跟apiserver通讯,apiserver会不堪重负,且消耗大量网络资源,获取效率也低下。

本篇简单的实现了一个controller,虽然它并没有如开篇所说的那样对资源的实际状态与期望状态的差异进行调谐,通过最简单的方式周期性地检查pod的状态,引入了informer,获取了它listAndWatch的结果,后续将介绍这个informer的机制,它如何执行listAndWatch,一个较为常见的标准的Controller是如何实现的。

Original: https://www.cnblogs.com/HopeGi/p/15313510.html
Author: 猴健居士
Title: 自己实现一个Controller——精简型

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516590/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • [原创]Golang一行代码给钉钉群推送消息

    [原创]Golang一行代码给钉钉群推送消息 钉钉本来就是工具,只是boss把你变成了工具. — 麦·卡隆 今天朋友扔给我个某签到脚本,让我做推送功能. 我迅速从吃灰收藏夹里掏出S…

    Go语言 2023年5月25日
    050
  • Go编译过程

    一、 Go编译流程 二、过程说明 词法解析 读取Go源文件,将字符序列转换为符号(token)序列,比如将”:=”转换为_Define 代码中的标识符、关键…

    Go语言 2023年5月25日
    065
  • Golang实现set

    Golang语言本身未实现set,但是实现了map golang的map是一种无序的键值对的集合,其中键是唯一的 而set是键的不重复的集合,因此可以用map来实现set 由于ma…

    Go语言 2023年5月25日
    061
  • Golang接口型函数使用技巧

    什么是接口型函数?顾名思义接口函数指的是用函数实现接口,这样在调用的时候就会非常简便,这种方式适用于只有一个函数的接口。 这里以迭代一个map为例,演示这一实现的技巧。 常规接口实…

    Go语言 2023年5月25日
    059
  • Go select 死锁引发的思考

    上文总结 package main import ( "fmt" ) func main() { ch := make(chan int) go func() …

    Go语言 2023年5月25日
    042
  • golang低级编程:一.unsafe包

    go语言在设计上确保了一些安全的属性,限制了程序可能出错的途径。例如严格的类型转换规则。但也使得很多实现的细节无法通过go程序来访问,例如对于聚合类型(如结构体)的内存布局,或者一…

    Go语言 2023年5月25日
    049
  • HTTP 尝试获取 Client IP

    HTTP 中获取 Client IP 相关策略需求, 在当下网络环境中多数只能提供建议作用. 更多的是 通过其它唯一标识来挖掘更多潜在价值. 本文主要就一个内容, 如何最大可能尝试…

    Go语言 2023年5月25日
    063
  • Go语言基础之并发

    并发是编程里面一个非常重要的概念,Go语言在语言层面天生支持并发,这也是Go语言流行的一个很重要的原因。 Go语言中的并发编程 并发与并行 并发:同一时间段内执行多个任务(你在用微…

    Go语言 2023年5月29日
    059
  • Go Error 嵌套到底是怎么实现的?

    原文链接: Go Error 嵌套到底是怎么实现的? Go Error 的设计哲学是 「Errors Are Values」。 这句话应该怎么理解呢?翻译起来挺难的。不过从源码的角…

    Go语言 2023年5月25日
    072
  • Go语言之高级篇beego框架之日志收集系统

    一、日志收集系统架构设计 图1 图2 二、开发环境 1、安装jdk jdk-8u51-windows-x64.exe 安装目录:C:\Program Files\jdk8 2、安装…

    Go语言 2023年5月29日
    055
  • beego下让swagger按照更新了controllers的接口信息自动更新commentsRouter_controllers.go

    beego下让swagger按照更新了controllers的接口信息自动更新commentsRouter_controllers.go (1)在beego环境中,当更新了cont…

    Go语言 2023年5月25日
    045
  • 【golang】pprof性能调优工具的具体使用(带案例)

    前言 大晚上的,老是刷到有关pprof的文章,忍不住看了几篇文章…写个学习笔记记录下~ 正文: 1.pprof是什么? pprof是go内置的性能调优工具,可以借助一些…

    Go语言 2023年5月25日
    080
  • muduo源码分析之TcpServer模块

    这次我们开始 muduo源代码的实际编写,首先我们知道 muduo是 LT模式, Reactor模式,下图为 Reactor模式的流程图[来源1] 然后我们来看下 muduo的整体…

    Go语言 2023年5月25日
    052
  • Go语言的goroutine

    Go世界里,每一个并发执行的活动成为goroutine。 通过创建goroutine,就可以实现并行运算,十分方便。 如果有函数f(),那么: f():调用函数f(),并且等待它返…

    Go语言 2023年5月29日
    069
  • 在开发中将git运用自如

    写这篇文章的初衷是记录自己在开发中使用git遇到的问题和如何利用git进行高效的开发。个人理解来看,很多人对git运用不自如主要是两方面的原因:1、死记硬背命令,这个其实可以通过h…

    Go语言 2023年5月25日
    053
  • 十分钟学会Golang开发gRPC服务

    gRPC是Google发起的一个开源RPC框架,使用HTTP/2传输协议,使用Protocol Buffers编码协议,相比RESTful框架的程序性能提高不少,而且当前流行的编程…

    Go语言 2023年5月25日
    077
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球