go语言异步压缩 golang异步gin( 五 )


消费组
funcconsumerCluster(){groupID :="group-1"config := cluster.NewConfig()config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Secondconfig.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){errors := c.Errors()noti := c.Notifications()for{select{caseerr := -errors:glog.Errorln(err)case-noti:}}}(c)formsg :=rangec.Messages() {fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成 。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}msg.Topic =`test0`msg.Value = https://www.04ip.com/post/sarama.StringEncoder("Hello World!")client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{fmt.Println("producer close err, ", err)return}deferclient.Close()pid, offset, err := client.SendMessage(msg)iferr !=nil{fmt.Println("send message failed, ", err)return}fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){config := sarama.NewConfig()config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Secondp, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写 , 不然通道会被堵塞gofunc(p sarama.AsyncProducer){errors := p.Errors()success := p.Successes()for{select{caseerr := -errors:iferr !=nil{glog.Errorln(err)}case-success:}}}(p)for{v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))fmt.Fprintln(os.Stdout, v)msg := sarama.ProducerMessage{Topic: topics,Value: sarama.ByteEncoder(v),}p.Input() - msgtime.Sleep(time.Second *1)}}funcmain(){goasyncProducer()select{}}
3.5. 结果展示-
同步生产打?。?
分区ID:0,offset:90
消费打?。?
Partition:0,Offset:90,key:,value:Hello World!
异步生产打?。?
async:7272async:7616async:998
消费打?。?
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998
golang中compress/flate包 官方标准库对flate包的定义是:flate包实现了deflate压缩数据格式 , 参见 RFC 1951。gzip包和zlib包实现了对基于deflate的文件格式的访问 。
这边什么是deflate?
维基百科给出的解释是: DEFLATE 是同时使用了 LZ77 算法与 哈夫曼编码 (Huffman Coding)的一个 无损数据压缩算法。它最初是由 菲尔·卡茨 (Phil Katz)为他的 PKZIP 软件第二版所定义的 , 后来被 RFC 1951 标准化 。
1)func NewReader(r io.Reader) io.ReadCloser
2)func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser
3)func NewWrite(w io.Write, level int) (*Write, error)
4)func NewWriteDict(w io.Writer, level int, dict []byte) (*Writer, error)
5)func (e InternalError) Error() string
6)func (e *ReadError) Error() string
7)func (e *WriteError) Error() string
8)func (w *Writer) Close() error
9)func (w *Writer) Flush() error
9)func (w *Writer) Reset(dst io.Writer)
10)func (w *Writer) Write(data []byte) (n int, err error)

推荐阅读