在以前的文章kafka初探go和C#的实现里面我们用了sarama来消费kafka的消息,但是很遗憾它没有group的概念。没办法 我们只能用sarama-cluster来实现, 注意sarama版本不要太新否则有错误panic: non-positive interval for NewTicker 问题处理,建议大家可以修改go.mod文件如下:
require (
github.com/Shopify/sarama v1.24.1
github.com/bsm/sarama-cluster v2.1.15+incompatible
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
_ "regexp"
"time"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
)
var Address = []string{"192.168.100.30:9092"}
var Topic = "gavintest"
//panic: non-positive interval for NewTicker
// 修改go.mod
//github.com/Shopify/sarama v1.24.1
// github.com/bsm/sarama-cluster v2.1.15+incompatible
//修改
/**
消费者
*/
func main() {
go syncConsumer("demo1")
go syncConsumer("demo2")
go ConsumerDemo3()
go syncProducer()
select {}
}
func syncConsumer(groupName string) {
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// init consumer
//可以订阅多个主题
topics := []string{Topic}
consumer, err := cluster.NewConsumer(Address, groupName, topics, config)
if err != nil {
panic(err)
}
//这里需要注意的是defer 一定要在panic 之后才能关闭连接
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume errors
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
// consume notifications
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
// 循环从通道中获取message
//msg.Topic 消息主题
//msg.Partition 消息分区
//msg.Offset
//msg.Key
//msg.Value 消息值
for {
select {
case msg, ok := consumer.Messages():
if ok {
fmt.Printf("%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", groupName, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
consumer.MarkOffset(msg, "") // 上报offset
}
case err := consumer.Errors():
{
fmt.Println(fmt.Sprintf("consumer error:%v", err))
}
case signals:
return
}
}
}
//同步消息模式
func syncProducer() {
//指定配置
config := sarama.NewConfig()
// 等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
config.Producer.Partitioner = sarama.NewRandomPartitioner
// 是否等待成功和失败后的响应
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
producer, err := sarama.NewSyncProducer(Address, config)
if err != nil {
log.Printf("sarama.NewSyncProducer err, message=%s \n", err)
return
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: Topic,
}
var i = 1100
for {
i++
//将字符串转换为字节数组
msg.Value = sarama.ByteEncoder(fmt.Sprintf("this is a message:%d", i))
//SendMessage:该方法是生产者生产给定的消息
//partition, offset, err := producer.SendMessage(msg)
_, _, err := producer.SendMessage(msg)
//生产失败的时候返回error
if err != nil {
fmt.Println(fmt.Sprintf("Send message Fail %v", err))
}
//生产成功的时候返回该消息的分区和所在的偏移量
//fmt.Printf("send message Partition = %d, offset=%d\n", partition, offset)
time.Sleep(time.Second * 5)
}
}
func ConsumerDemo3() {
config := sarama.NewConfig()
// Version 必须大于等于 V0_10_2_0
config.Version = sarama.V0_10_2_1
config.Consumer.Return.Errors = true
fmt.Println("start connect kafka")
// 开始连接kafka服务器
//group, err := sarama.NewConsumerGroup(Address, "demo3", config)
client, err := sarama.NewClient(Address, config)
if err != nil {
fmt.Println("connect kafka failed; err", err)
return
}
defer func() { _ = client.Close() }()
////
group, err := sarama.NewConsumerGroupFromClient("demo3", client)
if err != nil {
fmt.Println("connect kafka failed; err", err)
return
}
// 检查错误
go func() {
for err := range group.Errors() {
fmt.Println("group errors : ", err)
}
}()
ctx := context.Background()
fmt.Println("start get msg")
// for 是应对 consumer rebalance
for {
// 需要监听的主题
topics := []string{Topic}
handler := ConsumerGroupHandler{}
// 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里
err := group.Consume(ctx, topics, handler)
if err != nil {
fmt.Println("consume failed; err : ", err)
return
}
}
}
type ConsumerGroupHandler struct{}
func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
return nil
}
func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
return nil
}
// 这个方法用来消费消息的
func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// 获取消息
for msg := range claim.Messages() {
fmt.Printf("demo3 receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
// 将消息标记为已使用
sess.MarkMessage(msg, "")
}
return nil
}
运行结果:
https://github.com/bsm/sarama-cluster
windows技术爱好者
Original: https://www.cnblogs.com/majiang/p/14543566.html
Author: dz45693
Title: go kafka group
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/533845/
转载文章受原作者版权保护。转载请注明原作者出处!