go语言同步教程 go语言同步锁( 三 )


3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境 , 在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))}deferpc.AsyncClose()wg.Done()}(pc)}wg.Wait()}funcmain(){consumer()}
消费组
funcconsumerCluster(){groupID :="group-1"config := cluster.NewConfig()config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){errors := c.Errors()noti := c.Notifications()for{select{caseerr := -errors:glog.Errorln(err)case-noti:}}}(c)formsg :=rangec.Messages() {fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成 。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中 , 默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}msg.Topic =`test0`msg.Value = https://www.04ip.com/post/sarama.StringEncoder("Hello World!")client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{fmt.Println("producer close err, ", err)return}deferclient.Close()pid, offset, err := client.SendMessage(msg)iferr !=nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){config := sarama.NewConfig()config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Secondp, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写 , 不然通道会被堵塞gofunc(p sarama.AsyncProducer){errors := p.Errors()success := p.Successes()for{select{caseerr := -errors:iferr !=nil{glog.Errorln(err)}case-success:}}}(p)for{v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout, v)msg := sarama.ProducerMessage{Topic: topics,Value: sarama.ByteEncoder(v),}p.Input() - msgtime.Sleep(time.Second *1)}}funcmain(){goasyncProducer()select{}}
3.5. 结果展示-
同步生产打?。?
分区ID:0,offset:90
消费打?。?
Partition:0,Offset:90,key:,value:Hello World!
异步生产打?。?
async:7272async:7616async:998

推荐阅读