go语言集群之间同步文件 go语言通信

Golang kafka简述和操作(sarama同步异步和消费组)一、Kafka简述
1. 为什么需要用到消息队列
异步go语言集群之间同步文件:对比以前go语言集群之间同步文件的串行同步方式来说go语言集群之间同步文件,可以在同一时间做更多的事情,提高效率go语言集群之间同步文件;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂 。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体 , 系统主题始终以一个平稳的速率去消费这些消息 。
2.为什么选择kafka呢go语言集群之间同步文件?
这没有绝对的好坏 , 看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式 , 减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息 , 在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据 , 并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程 , 这里就不再搭建,就展示一下kafka的环境 , 在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroupconsumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{fmt.Println("Failed to start consumer: %s", err)return}partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))}deferpc.AsyncClose()wg.Done()}(pc)}wg.Wait()}funcmain(){consumer()}
消费组
funcconsumerCluster(){groupID :="group-1"config := cluster.NewConfig()config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){errors := c.Errors()noti := c.Notifications()for{select{caseerr := -errors:glog.Errorln(err)case-noti:}}}(c)formsg :=rangec.Messages() {fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka , 有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成 。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}msg.Topic =`test0`msg.Value = https://www.04ip.com/post/sarama.StringEncoder("Hello World!")client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{fmt.Println("producer close err, ", err)return}deferclient.Close()pid, offset, err := client.SendMessage(msg)iferr !=nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){config := sarama.NewConfig()config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Secondp, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){errors := p.Errors()success := p.Successes()for{select{caseerr := -errors:iferr !=nil{glog.Errorln(err)}case-success:}}}(p)for{v :="async: "strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout, v)msg := sarama.ProducerMessage{Topic: topics,Value: sarama.ByteEncoder(v),}p.Input() - msgtime.Sleep(time.Second *1)}}funcmain(){goasyncProducer()select{}}
3.5. 结果展示-
同步生产打?。?
分区ID:0,offset:90
消费打?。?
Partition:0,Offset:90,key:,value:Hello World!
异步生产打?。?
async:7272async:7616async:998
消费打?。?
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
基于go的websocket消息推送的集群实现目前websocket技术已经很成熟,选型Go语言,当然是为了节省成本以及它强大的高并发性能 。我使用的是第三方开源的websocket库即gorilla/websocket 。
由于我们线上推送的量不小,推送后端需要部署多节点保持高可用,所以需要自己做集群 , 具体架构方案如图:
Auth Service:鉴权服务 , 根据Token验证用户权限 。
Collect Service:消息采集服务,负责收集业务系统消息,存入MongoDB后,发送给消息分发服务 。
Dispatch Service:消息分发服务,根据路由规则分发至对应消息推送服务节点上 。
Push Service:消息推送服务 , 通过websocket将消息推送给用户 。
集群推送的关键点在于 , web端与服务端建立长连接之后,具体跟哪个推送节点保持长连接的,如果我们能够找到对应的连接节点 , 那么我们就可以将消息推送出去 。下面讲解一下集群的大致流程:
1. web端用户登录之后,带上token与后端推送服务(Push Service)保持长连接 。
2. 推送服务收到连接请求之后,携带token去鉴权服务(Auth Service)验证此token权限,并返回用户ID 。
3. 把返回的用户ID与长连接存入本地缓存,保持用户ID与长连接绑定关系 。
4. 再将用户ID与本推送节点IP存入redis,建立用户(即长连接)与节点绑定关系,并设置失效时间 。
5. 采集服务(Collect Service)收集业务消息,首先存入mongodb,然后将消息透传给分发服务(Dispatch Service) 。
6. 分发服务收到消息之后,根据消息体中的用户ID,从redis中获取对应的推送服务节点IP,然后转发给对应的推送节点 。
7. 推送服务节点收到消息之后,根据用户ID,从本地缓存中取出对应的长连接,将消息推送给客户端 。
其他注意事项:
go语言无缓冲的channel无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道 。
这种类型的通道要求发送goroutine和接收goroutine同时准备好,才能完成发送和接收操作 。否则 , 通道会导致先执行发送或接收操作的 goroutine 阻塞等待 。
这种对通道进行发送和接收的交互行为本身就是同步的 。其中任意一个操作都无法离开另一个操作单独存在 。
阻塞:由于某种原因数据没有到达 , 当前协程(线程)持续处于等待状态,直到条件满足,才接触阻塞 。
同步:在两个或多个协程(线程)间,保持数据内容一致性的机制 。
下图展示两个 goroutine 如何利用无缓冲的通道来共享一个值:
在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收 。
在第 2 步 , 左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为 。这时,这个 goroutine 会在通道中被锁住,直到交换完成 。
在第 3 步,右侧的 goroutine 将它的手放入通道 , 这模拟了从通道里接收数据 。这个 goroutine 一样也会在通道中被锁住 , 直到交换完成 。
在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来 , 这模拟了被锁住的 goroutine 得到释放 。两个 goroutine 现在都可以去做别的事情了 。
如果没有指定缓冲区容量 , 那么该通道就是同步的,因此会阻塞到发送者准备好发送和接收者准备好接收 。
无缓冲channel: —— 同步通信
golang使用Nsq1. 介绍
最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等 。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布 , 由bitly公司开源出来的一款简单易用的消息中间件 。
官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库 。
1.1 Features
1). Distributed
NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构 , 稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性 。
2). Scalable易于扩展
NSQ支持水平扩展,没有中心化的brokers 。内置的发现服务简化了在集群中增加节点 。同时支持pub-sub和load-balanced 的消息分发 。
3). Ops Friendly
NSQ非常容易配置和部署,生来就绑定了一个管理界面 。二进制包没有运行时依赖 。官方有Docker image 。
4.Integrated高度集成
官方的 Go 和 Python库都有提供 。而且为大多数语言提供了库 。
1.2 组件
1.3 拓扑结构
NSQ推荐通过他们相应的nsqd实例使用协同定位发布者 , 这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取 。更重要的是 , 发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息 。
NSQ
首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令 , 在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中 。
事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel , 它们其中之一作为档案channel 。消费者会获取这些消息并且上传到S3 。
nsqd
每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制 , 消息将会被写入到磁盘中 。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点 。
nsqlookupd
2. Internals
2.1 消息传递担保
1)客户表示已经准备好接收消息
2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)
3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败 。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息
这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程 。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失 。
如何防止消息丢失是最重要的 , 即使是这个意外情况可以得到缓解 。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本 。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息 。
2.2 简化配置和管理
单个 nsqd 实例被设计成可以同时处理多个数据流 。流被称为“话题”和话题有 1 个或多个“通道” 。每个通道都接收到一个话题中所有消息的拷贝 。在实践中,一个通道映射到下行服务消费一个话题 。
在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态 。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址 。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询 。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端 。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性 。
2.3 消除单点故障
【go语言集群之间同步文件 go语言通信】 NSQ被设计以分布的方式被使用 。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例 。没有中间人,没有消息代理,也没有单点故障 。
这种拓扑结构消除单链 , 聚合,反馈 。相反 , 你的消费者直接访问所有生产者 。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息 , 保证所有东西最终将被处理 。对于 nsqlookupd,高可用性是通过运行多个实例来实现 。他们不直接相互通信和数据被认为是最终一致 。消费者轮询所有的配置的 nsqlookupd 实例和合并 response 。失败的,无法访问的 , 或以其他方式故障的节点不会让系统陷于停顿 。
2.4 效率
对于数据的协议,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据 。这个概念,称之为 RDY 状态 , 基本上是客户端流量控制的一种形式 。
efficiency
2.5 心跳和超时
组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联 。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误 。当检测到一个致命错误,客户端连接被强制关闭 。在传输中的消息会超时而重新排队等待传递到另一个消费者 。最后,错误会被记录并累计到各种内部指标 。
2.6 分布式
因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生 。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布 , 即使面对网络分区 。
这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧 。唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集 。清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义 。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显 。
2.7 no replication
不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证 。我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列 。但是这样仍然存在一个消息被发布后马上死亡 , 丢失了有效的写入的情况 。
2.8 没有严格的顺序
虽然Kafka由一个有序的日志构成 , 但NSQ不是 。消息可以在任何时间以任何顺序进入队列 。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况 。
2.9 无数据重复删除功能
NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡 。很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性 。
3. 实践安装过程
本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单 。这部分介绍下笔者实验的拓扑 , 以及nsqadmin的相关信息 。
3.1 拓扑结构
topology
实验采用3台NSQD服务,2台LOOKUPD服务 。
采用官方推荐的拓扑 , 消息发布的服务和NSQD在一台主机 。一共5台机器 。
NSQ基本没有配置文件,配置通过命令行指定参数 。
主要命令如下:
LOOKUPD命令
NSQD命令
工具类,消费后存储到本地文件 。
发布一条消息
3.2 nsqadmin
对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息 。
nsqadmin
channel
列出所有的NSQD节点:
nodes
消息的统计:
msgs
lookup主机的列表:
hosts
4. 总结
NSQ基本核心就是简单性 , 是一个简单的队列 , 这意味着它很容易进行故障推理和很容易发现bug 。消费者可以自行处理故障事件而不会影响系统剩下的其余部分 。
事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量 。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能 。
结合我们的业务系统来看 , 对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回 。这是我们没有选择该消息中间件的主要原因 。简单性和可靠性似乎并不能完全满足 。相比Kafka , ops肩负起更多负责的运营 。另一方面 , 它拥有一个可复制的、有序的日志可以提供给我们更好的服务 。但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础 。
关于go语言集群之间同步文件和go语言通信的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站 。

    推荐阅读