Consul 入门-gRPC 服务注册与发现

前言

假如我有钱,我想买一个降噪耳机,我应该哪里买? 答案很简单,可以去京东或者线下实体店。
那如果把这个问题映射到微服务架构中:我打开京东,选中某款耳机进入详情页浏览,我可以看到这款耳机的价格、库存、规格、评价等。以我的理解,这个链路应该是这样的:

 Consul 入门-gRPC 服务注册与发现
暂定这个系统由3个微服务组成:商品详情服务、库存服务、评价服务。
  • 商品详情服务:聚合端上用户看到的所有信息
  • 库存服务:维护商品的库存信息、规格信息、价格信息
  • 评价服务:维护用户对商品的评价

微服务的目的是为了基于松耦合高内聚将单体服务进行拆分,然后将个服务进行多副本部署(我们甚至不知道它会被部署到哪里,实体机?虚拟机?容器?云上?)以达到高可用的目的。这也要付出点代价,商品详情服务需要知道:库存服务和评价服务在哪里?

由此,我们将继续学习 Consul 这款不错的服务发现工具,通前面的学习,我们已经对 Consul 的原理、使用、搭建有了认知。本次将学习:Consul 如何在 gRPC 构建的微服务网络环境中做一名合格的”指路人”。

编写一个 Go gRPC 服务

gRPC 是由 Google 开发并开源的RPC框架,详见官网。我们将通过官网的指导来编写一个简单的 go gRPC 服务

 Consul 入门-gRPC 服务注册与发现

获取样例代码

  1. 克隆 grpc-go仓库
$ git clone -b v1.29.1.0 https://github.com/grpc/grpc-go
  1. 切换到样例代码目录
$ cd cd grpc-go/examples/helloworld

目录结构如下:

├── greeter_client
│   └── main.go
├── helloword
│   └── helloword.proto
└── greeter_server
|   └── main.go

运行样例代码

  1. 编译执行 server 代码:
$ go run greeter_server/main.go
  1. 在新开一个终端,编译执行 client 代码,可以看到输出:
$ go run greeter_client/main.go
2021/09/11 16:28:29 Greeting: Hello world

gRPC 的 Banlancer

greeter_client/main.go中,是通过指定 server 地址的方式来实现访问到目标服务的

...

const (
    address = "localhost:50051"
)

func main () {
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    ...

}

但这种方式在生产环境是不可行的,因为我们并不知道目标服务的地址(目标服务的地址也有可能不只一个)。实际上,gRPC 已经为我们提供来解决方案:Balancer。

首先,看一下 gRPC 客户端负载均衡实现的官方架构图:

 Consul 入门-gRPC 服务注册与发现

从图中,可以看到 Balancer 均衡器位于架构的最右方,内置一个 Picker 模块,Balancer 主要完成下面几个功能:

  • 与 Rersovler 通信(维持通信机制),接收 Resovler 通知的服务端列表更新,维护 Connection Pool 及每个连接的状态
  • 对上一步获取的服务端列表,调用 newSubConn异步建立长连接(每个 Backend 一个长连接),同时,监控连接的状态,及时更新 Connection Pool
  • 创建 Picker,Picker 执行的算法就是真正的 LB 逻辑,当客户端使用 conn初始化 PRC 方法时,通过 Picker 选择一个存活的连接,返回给客户端,然后调用 UpdatePicker 更新 LB 算法的内置状态,为下一次调用做准备
  • Balaner 是 gRPC 负载均衡最核心的模块

据此,我们可用通过自定义的 Balancer,在 Balaner 基础上通过实现自定义的 naming.Resolver来达到使用 Consul 看发现服务的功能。

大概流程是:

  1. grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer
  2. Balaner 会通过 naming.Resolver 去解析 (Resovle)Dial 传入的 target 得到一个 nameing.Watcher
  3. naming.Watcher持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer

实现 Consul Resolver

grpc-go/naming/naming.go中可以看到 Resolver接口的声明

type Resolver interface {
    // Resolve creates a Watcher for target.
    Resolve(target string) (Watcher, error)
}

需要实现一个 Consul Resolver,在里面返回可用的服务端地址列表,在 examples目录下新建 grpcresolver文件夹,在该文件夹下新建 consul.go文件:

package grpcresolver

import (
    "fmt"
    "net"
    "strconv"
    "sync"
    "sync/atomic"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc/naming"
)

type watchEntry struct {
    addr string
    modi uint64
    last uint64
}

type consulWatcher struct {
    down      int32
    c         *api.Client
    service   string
    mu        sync.Mutex
    watched   map[string]*watchEntry
    lastIndex uint64
}

func (w *consulWatcher) Close() {
    atomic.StoreInt32(&w.down, 1)
}

func (w *consulWatcher) Next() ([]*naming.Update, error) {
    w.mu.Lock()
    defer w.mu.Unlock()
    watched := w.watched
    lastIndex := w.lastIndex
retry:
        // 访问 Consul, 获取可用的服务列表
    services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{
        WaitIndex: lastIndex,
    })
    if err != nil {
        return nil, err
    }
    if lastIndex == meta.LastIndex {
        if atomic.LoadInt32(&w.down) != 0 {
            return nil, nil
        }
        goto retry
    }
    lastIndex = meta.LastIndex
    var updating []*naming.Update
    for _, s := range services {
        ws := watched[s.ServiceID]
        fmt.Println(s.ServiceAddress, s.ServicePort)
        if ws == nil {
                        // 如果是新注册的服务
            ws = &watchEntry{
                addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)),
                modi: s.ModifyIndex,
            }
            watched[s.ServiceID] = ws

            updating = append(updating, &naming.Update{
                Op:   naming.Add,
                Addr: ws.addr,
            })
        } else if ws.modi != s.ModifyIndex {
                        // 如果是原来的服务
            updating = append(updating, &naming.Update{
                Op:   naming.Delete,
                Addr: ws.addr,
            })
            ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort))
            ws.modi = s.ModifyIndex
            updating = append(updating, &naming.Update{
                Op:   naming.Add,
                Addr: ws.addr,
            })
        }
        ws.last = lastIndex
    }
    for id, ws := range watched {
        if ws.last != lastIndex {
            delete(watched, id)
            updating = append(updating, &naming.Update{
                Op:   naming.Delete,
                Addr: ws.addr,
            })
        }
    }
    w.watched = watched
    w.lastIndex = lastIndex
    return updating, nil
}

type consulResolver api.Client

func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
    return &consulWatcher{
        c:       (*api.Client)(r),
        service: target,
        watched: make(map[string]*watchEntry),
    }, nil
}

func ForConsul(reg *api.Client) naming.Resolver {
    return (*consulResolver)(reg)
}

server 端通过 Consul 注册服务

修改 examples/helloword/greeter_server/main.go,在启动服务前,将服务的信息注册到 Consul

package main

import (
    "context"
    "encoding/hex"
    "flag"
    "fmt"
    "log"
    "math/rand"
    "net"
    "strconv"
    "time"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
    // host = "192.168.10.102"
    // port = 50051
    ttl = 30 * time.Second
)

// server is used to implement helloworld.GreeterServer.

type server struct {
    pb.UnimplementedGreeterServer
    port int
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received: %v", in.GetName())
    return &pb.HelloReply{Message: fmt.Sprintf("Hello %s, from %d", in.GetName(), s.port)}, nil
}

func main() {

    host := flag.String("h", "127.0.0.1", "host")
    port := flag.Int("p", 50051, "port")
    flag.Parse()

    lis, err := net.Listen("tcp", net.JoinHostPort(*host, strconv.Itoa(*port)))
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }

    // Consul Client
    registry, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatalln(err)
    }

    var h [16]byte
    rand.Read(h[:])
    // 生成一个全局ID
    id := fmt.Sprintf("helloserver-%s-%d", hex.EncodeToString(h[:]), *port)
    fmt.Println(id)
    // 注册到 Consul,包含地址、端口信息,以及健康检查
    err = registry.Agent().ServiceRegister(&api.AgentServiceRegistration{
        ID:      id,
        Name:    "helloserver",
        Port:    *port,
        Address: *host,
        Check: &api.AgentServiceCheck{
            TTL:     (ttl + time.Second).String(),
            Timeout: time.Minute.String(),
        },
    })
    if err != nil {
        log.Fatalln(err)
    }
    go func() {
        checkid := "service:" + id
        for range time.Tick(ttl) {
            err := registry.Agent().PassTTL(checkid, "")
            if err != nil {
                log.Fatalln(err)
            }
        }
    }()

    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{port: *port})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

client 端通过 Consul 发现服务

package main

import (
    "context"
    "log"
    "time"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc"
    "google.golang.org/grpc/examples/grpcresolver"
    pb "google.golang.org/grpc/examples/helloworld/helloworld"
)

const (
    address     = "localhost:50051"
    defaultName = "world"
)

func main() {
    // consul
    registry, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatalln(err)
    }

    // 自定义 LB,并使用刚才写的 Consul Resolver
    lbrr := grpc.RoundRobin(grpcresolver.ForConsul(registry))

    // Set up a connection to the server.
    conn, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lbrr))
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewGreeterClient(conn)

    // 调用 server 端 RPC,通过响应观察负载均衡
    for range time.Tick(time.Second) {
        name := defaultName
        r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
        if err != nil {
            log.Fatalf("could not greet: %v", err)
            continue
        }
        log.Printf("server reply: %s", r.GetMessage())
    }
}

启动 & 查看

  1. 启动两个 Server,设置不同的启动端口
启动 server1
$ go run grpc-go\examples\helloworld\greeter_server\main.go -p 50015
helloserver-52fdfc072182654f163f5f0f9a621d72-50015
启动 server2
$ go run grpc-go\examples\helloworld\greeter_server\main.go -p 50014
helloserver-52fdfc072182654f163f5f0f9a621d72-50014

通过 Consul Web UI 查看,两个 instance 均是健康的

 Consul 入门-gRPC 服务注册与发现
  1. 启动 Client
$ go run grpc-go\examples\helloworld\greeter_client\main.go
2021/09/12 16:42:39 server reply: Hello world, from 50014
2021/09/12 16:42:40 server reply: Hello world, from 50015
2021/09/12 16:42:41 server reply: Hello world, from 50014
2021/09/12 16:42:42 server reply: Hello world, from 50015
2021/09/12 16:42:43 server reply: Hello world, from 50014
2021/09/12 16:42:44 server reply: Hello world, from 50015
2021/09/12 16:42:45 server reply: Hello world, from 50014

可以看到是均匀对两个 server 发起调用,当我们将其中一个 instance server2 关掉(模拟不可用的情况),流量全面全部转移到另一台上了

 Consul 入门-gRPC 服务注册与发现
说明失败转移也是正常的。

Original: https://www.cnblogs.com/Zioyi/p/15255570.html
Author: Zioyi
Title: Consul 入门-gRPC 服务注册与发现

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/575864/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • [spring]spring管理的入门项目快速搭建

    1.spring简介 Spring框架是一个开源的应用程序框架,是针对bean的生命周期进行管理的轻量级容器。 Spring解决了开发者在J2EE开发中遇到的许多常见的问题,提供了…

    数据库 2023年6月16日
    085
  • cv2简单使用(opencv-python)

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    数据库 2023年6月9日
    063
  • 获取单选按钮组的选中值

    1 var radioGroup = new Ext.form.RadioGroup({ 2 width: 130, 3 id: ‘selected’, 4 layout: ‘fo…

    数据库 2023年6月9日
    068
  • 控制反转(IOC容器)-Autofac入门

    注意:本文为原创文章,任何形式的转载、引用(包括但不限于以上形式)等,须先征得作者同意,否则一切后果自负。 简介 Autofac 是一个令人着迷的.NET IoC 容器。 它管理类…

    数据库 2023年6月14日
    076
  • Linux–>进程管理

    基本介绍 在Linux中, 每个执行程序都称为一个进程。每一个进程都会分配一个ID号(pid,进程号) 每个进程都可能以俩种方式存在的。分别是 前台与 后台,所谓前台进程就是用户目…

    数据库 2023年6月14日
    095
  • Dubbo源码(一)-SPI使用

    Dubbo 的可扩展性是基于 SPI 去实现的,而且Dubbo所有的组件都是通过 SPI 机制加载。 SPI 全称为 (Service Provider Interface) ,是…

    数据库 2023年6月11日
    083
  • markdown笔记

    注:笔记旨在记录 1.1 展示一级标题(在标题紧接的下一行加若干个’=’) ======= 1.2 展示二级标题 (在标题紧接的下一行加若干个’…

    数据库 2023年6月16日
    093
  • 分布式锁的实现

    什么是分布式锁? 为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用Java并发处理相关的API(如ReentrantLo…

    数据库 2023年6月6日
    099
  • 三分钟图解事务隔离级别,看一遍就懂

    前文说过,”锁” 是数据库系统区别于文件系统的一个关键特性,其对象是 事务,用来锁定的是数据库中的对象,如表、页、行等。锁确实提高了并发性,但是却不可避免地…

    数据库 2023年5月24日
    0106
  • dns服务之bind配置内网解析部分子域名,其它子域名转发

    bind配置内网解析部分子域名,其它子域名转发。以下以m.xxx.com和admin.xxx.com由内网dns解析,其它*.xxx.com转发给外网dns解析为例配置。文件/et…

    数据库 2023年6月14日
    094
  • Java并发

    Java并发 JAVA技术交流群:737698533 CAS compare and swap 比较并交换,cas又叫做无锁,自旋锁,乐观锁,轻量级锁 例如下面的代码,如果想在多线…

    数据库 2023年6月16日
    070
  • jdbc-使用java连接mysql

    package com.cqust; import com.mysql.jdbc.Driver; import java.sql.Connection;import java.sq…

    数据库 2023年5月24日
    054
  • Kmp算法

    算法流程: kmp_search(char[] text,char[] pattern) 构建前缀表 prefix[0]默认&#…

    数据库 2023年6月11日
    0104
  • Java学习-第一部分-第二阶段-第六节:泛型

    泛型 笔记目录:(https://www.cnblogs.com/wenjie2000/p/16378441.html) 泛型的理解和好处 看一个需求 请编写程序,在ArrayLi…

    数据库 2023年6月11日
    089
  • Windows 是最安全的操作系统

    建了一个用户交流群,我在群里说:”Windows 是最安全的操作系统。” 立刻引发了很多有意思的观点。我在群里一个人说不过大家,先篇文章把自己的论点罗列一下…

    数据库 2023年6月6日
    0238
  • MYSQL的Java操作器——JDBC

    在学习了Mysql之后,我们就要把Mysql和我们之前所学习的Java所结合起来 而JDBC就是这样一种工具:帮助我们使用Java语言来操作Mysql数据库 JDBC简介 首先我们…

    数据库 2023年5月24日
    067
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球