Golang kafka简述和操作(sarama同步异步和消费组)一、Kafka简述
1. 为什么需要用到消息队列
异步GO语言的异步:对比以前的串行同步方式来说GO语言的异步,可以在同一时间做更多的事情 , 提高效率GO语言的异步;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂 。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息 。
2.为什么选择kafka呢GO语言的异步?
这没有绝对的好坏 , 看个人需求来选择 , 我这里就抄GO语言的异步了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余 , 保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
【GO语言的异步 golang 异步】 1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据 , 消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建 , 有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来 , 然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))}deferpc.AsyncClose()wg.Done()}(pc)}wg.Wait()}funcmain(){consumer()}
消费组
funcconsumerCluster(){groupID :="group-1"config := cluster.NewConfig()config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){errors := c.Errors()noti := c.Notifications()for{select{caseerr := -errors:glog.Errorln(err)case-noti:}}}(c)formsg :=rangec.Messages() {fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka , 有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成 。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}msg.Topic =`test0`msg.Value = https://www.04ip.com/post/sarama.StringEncoder("Hello World!")client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{fmt.Println("producer close err, ", err)return}deferclient.Close()pid, offset, err := client.SendMessage(msg)iferr !=nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){config := sarama.NewConfig()config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Secondp, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){errors := p.Errors()success := p.Successes()for{select{caseerr := -errors:iferr !=nil{glog.Errorln(err)}case-success:}}}(p)for{v :="async: "strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout, v)msg := sarama.ProducerMessage{Topic: topics,Value: sarama.ByteEncoder(v),}p.Input() - msgtime.Sleep(time.Second *1)}}funcmain(){goasyncProducer()select{}}
3.5. 结果展示-
同步生产打?。?
分区ID:0,offset:90
消费打?。?
Partition:0,Offset:90,key:,value:Hello World!
异步生产打?。?
async:7272async:7616async:998
消费打?。?
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
GO语言商业案例(十八):stream切换到新语言始终是一大步,尤其是当您的团队成员只有一个时有该语言的先前经验 。现在 , Stream 的主要编程语言从 Python 切换到了 Go 。这篇文章将解释stream决定放弃 Python 并转向 Go 的一些原因 。
Go 非常快 。性能类似于 Java 或 C。对于用例,Go 通常比 Python 快 40 倍 。
对于许多应用程序来说,编程语言只是应用程序和数据库之间的粘合剂 。语言本身的性能通常并不重要 。然而,Stream 是一个API 提供商 , 为 700 家公司和超过 5 亿最终用户提供提要和聊天平台 。多年来,我们一直在优化 Cassandra、PostgreSQL、Redis 等,但最终,您会达到所使用语言的极限 。Python 是一门很棒的语言,但对于序列化/反序列化、排名和聚合等用例,它的性能相当缓慢 。我们经常遇到性能问题,Cassandra 需要 1 毫秒来检索数据,而 Python 会花费接下来的 10 毫秒将其转换为对象 。
看看我如何开始 Go 教程中的一小段 Go 代码 。(这是一个很棒的教程 , 也是学习 Go 的一个很好的起点 。)
如果您是 Go 新手,那么在阅读那个小代码片段时不会有太多让您感到惊讶的事情 。它展示了多个赋值、数据结构、指针、格式和一个内置的 HTTP 库 。当我第一次开始编程时,我一直喜欢使用 Python 更高级的功能 。Python 允许您在编写代码时获得相当的创意 。例如,您可以:
这些功能玩起来很有趣 , 但是,正如大多数程序员会同意的那样,在阅读别人的作品时,它们通常会使代码更难理解 。Go 迫使你坚持基础 。这使得阅读任何人的代码并立即了解发生了什么变得非常容易 。注意:当然 , 它实际上有多“容易”取决于您的用例 。如果你想创建一个基本的 CRUD API , 我仍然推荐 DjangoDRF或 Rails 。
作为一门语言,Go 试图让事情变得简单 。它没有引入许多新概念 。重点是创建一种非常快速且易于使用的简单语言 。它唯一具有创新性的领域是 goroutine 和通道 。(100% 正确CSP的概念始于 1977 年,所以这项创新更多是对旧思想的一种新方法 。)Goroutines 是 Go 的轻量级线程方法 , 通道是 goroutines 之间通信的首选方式 。Goroutines 的创建非常便宜 , 并且只需要几 KB 的额外内存 。因为 Goroutine 非常轻量,所以有可能同时运行数百甚至数千个 。您可以使用通道在 goroutine 之间进行通信 。Go 运行时处理所有复杂性 。goroutines 和基于通道的并发方法使得使用所有可用的 CPU 内核和处理并发 IO 变得非常容易——所有这些都不会使开发复杂化 。与 Python/Java 相比 , 在 goroutine 上运行函数需要最少的样板代码 。您只需在函数调用前加上关键字“go”:
Go 的并发方法很容易使用 。与 Node 相比,这是一种有趣的方法,开发人员必须密切关注异步代码的处理方式 。Go 中并发的另一个重要方面是竞争检测器 。这样可以很容易地确定异步代码中是否存在任何竞争条件 。
我们目前用 Go 编写的最大的微服务编译需要 4 秒 。与以编译速度慢而闻名的 Java 和 C等语言相比 , Go 的快速编译时间是一项重大的生产力胜利 。我喜欢在程序编译的时候摸鱼,但在我还记得代码应该做什么的同时完成事情会更好 。
首先,让我们从显而易见的开始:与 C和 Java 等旧语言相比 , Go 开发人员的数量并不多 。根据StackOverflow的数据,38%的开发人员知道 Java,19.3%的人知道 C,只有4.6%的人知道 Go 。GitHub 数据显示了类似的趋势:Go 比 Erlang、Scala 和 Elixir 等语言使用更广泛,但不如 Java 和 C流行 。幸运的是,Go 是一种非常简单易学的语言 。它提供了您需要的基本功能,仅此而已 。它引入的新概念是“延迟”声明和内置的并发管理与“goroutines”和通道 。(对于纯粹主义者来说:Go 并不是第一种实现这些概念的语言,只是第一种使它们流行起来的语言 。)任何加入团队的 Python、Elixir、C、Scala 或 Java 开发人员都可以在一个月内在 Go 上发挥作用,因为它的简单性 。与许多其他语言相比,我们发现组建 Go 开发人员团队更容易 。如果您在博尔德和阿姆斯特丹等竞争激烈的生态系统中招聘人员,这是一项重要的优势 。
对于我们这样规模的团队(约 20 人)来说,生态系统很重要 。如果您必须重新发明每一个小功能,您根本无法为您的客户创造价值 。Go 对我们使用的工具有很好的支持 。实体库已经可用于 Redis、RabbitMQ、PostgreSQL、模板解析、任务调度、表达式解析和 RocksDB 。与 Rust 或 Elixir 等其他较新的语言相比,Go 的生态系统是一个重大胜利 。它当然不如 Java、Python 或 Node 之类的语言好,但它很可靠,而且对于许多基本需求,你会发现已经有高质量的包可用 。
Gofmt 是一个很棒的命令行实用程序,内置在 Go 编译器中,用于格式化代码 。就功能而言,它与 Python 的 autopep8 非常相似 。我们大多数人并不真正喜欢争论制表符与空格 。格式的一致性很重要,但实际的格式标准并不那么重要 。Gofmt 通过使用一种正式的方式来格式化您的代码来避免所有这些讨论 。
Go 对协议缓冲区和 gRPC 具有一流的支持 。这两个工具非常适合构建需要通过 RPC 通信的微服务 。您只需要编写一个清单 , 在其中定义可以进行的 RPC 调用以及它们采用的参数 。然后从这个清单中自动生成服务器和客户端代码 。生成的代码既快速又具有非常小的网络占用空间并且易于使用 。从同一个清单中,您甚至可以为许多不同的语言生成客户端代码,例如 C、Java、Python 和 Ruby 。因此,内部流量不再有模棱两可的 REST 端点,您每次都必须编写几乎相同的客户端和服务器代码 。.
Go 没有像 Rails 用于 Ruby、Django 用于 Python 或 Laravel 用于 PHP 那样的单一主导框架 。这是 Go 社区内激烈争论的话题,因为许多人主张你不应该一开始就使用框架 。我完全同意这对于某些用例是正确的 。但是,如果有人想构建一个简单的 CRUD API,他们将更容易使用 Django/DJRF、Rails Laravel 或Phoenix 。对于 Stream 的用例,我们更喜欢不使用框架 。然而,对于许多希望提供简单 CRUD API 的新项目来说,缺乏主导框架将是一个严重的劣势 。
Go 通过简单地从函数返回错误并期望调用代码来处理错误(或将其返回到调用堆栈)来处理错误 。虽然这种方法有效,但很容易失去问题的范围 , 以确保您可以向用户提供有意义的错误 。错误包通过允许您向错误添加上下文和堆栈跟踪来解决此问题 。另一个问题是很容易忘记处理错误 。像 errcheck 和 megacheck 这样的静态分析工具可以方便地避免犯这些错误 。虽然这些变通办法效果很好,但感觉不太对劲 。您希望该语言支持正确的错误处理 。
Go 的包管理绝不是完美的 。默认情况下,它无法指定特定版本的依赖项,也无法创建可重现的构建 。Python、Node 和 Ruby 都有更好的包管理系统 。但是,使用正确的工具,Go 的包管理工作得很好 。您可以使用Dep来管理您的依赖项,以允许指定和固定版本 。除此之外,我们还贡献了一个名为的开源工具VirtualGo,它可以更轻松地处理用 Go 编写的多个项目 。
我们进行的一个有趣的实验是在 Python 中使用我们的排名提要功能并在 Go 中重写它 。看看这个排名方法的例子:
Python 和 Go 代码都需要执行以下操作来支持这种排名方法:
开发 Python 版本的排名代码大约花了 3 天时间 。这包括编写代码、单元测试和文档 。接下来,我们花了大约 2 周的时间优化代码 。其中一项优化是将分数表达式 (simple_gauss(time)*popularity) 转换为抽象语法树. 我们还实现了缓存逻辑,可以在未来的特定时间预先计算分数 。相比之下,开发此代码的 Go 版本大约需要 4 天时间 。性能不需要任何进一步的优化 。因此,虽然 Python 的最初开发速度更快 , 但基于 Go 的版本最终需要我们团队的工作量大大减少 。另外一个好处是,Go 代码的执行速度比我们高度优化的 Python 代码快大约 40 倍 。现在,这只是我们通过切换到 Go 体验到的性能提升的一个示例 。
与 Python 相比,我们系统的其他一些组件在 Go 中构建所需的时间要多得多 。作为一个总体趋势,我们看到开发Go 代码需要更多的努力 。但是,我们花更少的时间优化代码以提高性能 。
我们评估的另一种语言是Elixir. 。Elixir 建立在 Erlang 虚拟机之上 。这是一种迷人的语言,我们之所以考虑它,是因为我们的一名团队成员在 Erlang 方面拥有丰富的经验 。对于我们的用例 , 我们注意到 Go 的原始性能要好得多 。Go 和 Elixir 都可以很好地服务数千个并发请求 。但是,如果您查看单个请求的性能,Go 对于我们的用例来说要快得多 。我们选择 Go 而不是 Elixir 的另一个原因是生态系统 。对于我们需要的组件,Go 有更成熟的库,而在许多情况下 , Elixir 库还没有准备好用于生产环境 。培训/寻找开发人员使用 Elixir 也更加困难 。这些原因使天平向 Go 倾斜 。Elixir 的 Phoenix 框架看起来很棒,绝对值得一看 。
Go 是一种非常高性能的语言,对并发有很好的支持 。它几乎与 C和 Java 等语言一样快 。虽然与 Python 或 Ruby 相比 , 使用 Go 构建东西确实需要更多时间,但您将节省大量用于优化代码的时间 。我们在Stream有一个小型开发团队,为超过 5 亿最终用户提供动力和聊天 。Go 结合了强大的生态系统、新开发人员的轻松入门、快速的性能、对并发的可靠支持和高效的编程环境 , 使其成为一个不错的选择 。Stream 仍然在我们的仪表板、站点和机器学习中利用 Python 来提供个性化的订阅源. 我们不会很快与 Python 说再见 , 但今后所有性能密集型代码都将使用 Go 编写 。我们新的聊天 API也完全用 Go 编写 。
Golang入门到项目实战 | golang并发变成之通道channel Go提供了一种称为通道的机制 , 用于在goroutine之间共享数据 。当您作为goroutine执行并发活动时 , 需要在goroutine之间共享资源或数据,通道充当goroutine之间的管道(管道)并提供一种机制来保证同步交换 。
根据数据交换的行为 , 有两种类型的通道:无缓冲通道和缓冲通道 。无缓冲通道用于执行goroutine之间的同步通信,而缓冲通道用于执行异步通信 。无缓冲通道保证在发送和接收发生的瞬间两个goroutine之间的交换 。缓冲通道没有这样的保证 。
通道由make函数创建,该函数指定chan关键字和通道的元素类型 。
这是创建无缓冲和缓冲通道的代码块:
语法
使用内置函数make创建无缓冲和缓冲通道 。make的第一个参数需要关键字chan,然后是通道允许交换的数据类型 。
这是将值发送到通道的代码块需要使用-运算符:
语法
一个包含5个值的缓冲区的字符串类型的goroutine1通道 。然后我们通过通道发送字符串“Australia” 。
这是从通道接收值的代码块:
语法
- 运算符附加到通道变量(goroutine1)的左侧,以接收来自通道的值 。
在无缓冲通道中,在接收到任何值之前没有能力保存它 。在这种类型的通道中,发送和接收goroutine在任何发送或接收操作完成之前的同一时刻都准备就绪 。如果两个goroutine没有在同一时刻准备好,则通道会让执行其各自发送或接收操作的goroutine首先等待 。同步是通道上发送和接收之间交互的基础 。没有另一个就不可能发生 。
在缓冲通道中,有能力在接收到一个或多个值之前保存它们 。在这种类型的通道中,不要强制goroutine在同一时刻准备好执行发送和接收 。当发送和接收阻塞时也有不同的条件 。只有当通道中没有要接收的值时,接收才会阻塞 。仅当没有可用缓冲区来放置正在发送的值时,发送才会阻塞 。
实例
运行结果
协程与异步IO协程GO语言的异步,又称微线程 , 纤程 。英文名 Coroutine。Python对协程的支持是通过 generator 实现的 。在generator中,GO语言的异步我们不但可以通过for循环来迭代,还可以不断调用 next()函数 获取由 yield 语句返回的下一个值 。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数 。yield其实是终端当前的函数,返回给调用方 。python3中使用yield来实现range,节省内存,提高性能,懒加载的模式 。
asyncio是Python3.4 版本引入的 标准库,直接内置了对异步IO的支持 。
从Python3.5 开始引入了新的语法 async 和 await ,用来简化yield的语法:
import asyncio
import threading
async def compute(x, y):
print("Compute %s%s ..." % (x, y))
print(threading.current_thread().name)
await asyncio.sleep(xy)
return xy
async def print_sum(x, y):
result = await compute(x, y)
print("%s%s = %s" % (x, y, result))
print(threading.current_thread().name)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
线程是内核进行抢占式的调度的,这样就确保了每个线程都有执行的机会 。而 coroutine 运行在同一个线程中,由语言的运行时中的EventLoop(事件循环) 来进行调度 。和大多数语言一样,在 Python 中,协程的调度是非抢占式的,也就是说一个协程必须主动让出执行机会,其他协程才有机会运行 。
让出执行的关键字就是 await 。也就是说一个协程如果阻塞了,持续不让出 CPU,那么整个线程就卡住了,没有任何并发 。
PS: 作为服务端,event loop最核心的就是IO多路复用技术 , 所有来自客户端的请求都由IO多路复用函数来处理;作为客户端,event loop的核心在于利用Future对象延迟执行,并使用send函数激发协程,挂起,等待服务端处理完成返回后再调用CallBack函数继续下面的流程
Go语言的协程是 语言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的协程是eventloop模型),但是erlang是基于进程的消息通信 , go是基于goroutine和channel的通信 。
Python和Go都引入了消息调度系统模型,来避免锁的影响和进程/线程开销大的问题 。
协程从本质上来说是一种用户态的线程,不需要系统来执行抢占式调度,而是在语言层面实现线程的调度。因为协程 不再使用共享内存/数据 ,而是使用 通信 来共享内存/锁,因为在一个超级大系统里具有无数的锁,共享变量等等会使得整个系统变得无比的臃肿 , 而通过消息机制来交流,可以使得每个并发的单元都成为一个独立的个体,拥有自己的变量,单元之间变量并不共享,对于单元的输入输出只有消息 。开发者只需要关心在一个并发单元的输入与输出的影响,而不需要再考虑类似于修改共享内存/数据对其它程序的影响 。
关于GO语言的异步和golang 异步的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站 。
推荐阅读
- ios12已删的照片恢复,iphone12彻底删除的照片找回
- 鸡西app小程序开发,APP小程序开发
- jquery选择当前的元素,jquery选择dom元素
- 鸿蒙系统手机多屏互动在哪,鸿蒙如何开启多屏协同
- 关于windows和凤凰系统的信息
- 非著名程序员python创始人的简单介绍
- 益智休闲3d类游戏推荐,休闲益智类的游戏
- 奇迹游戏角色扮演,奇迹角色介绍
- 三角波函数python 三角波函数在一个周期内的表达式如下