go语言导出百万级数据 golang导出csv( 二 )


// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
本地Go routines方法
刚开始 , 我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3()// ----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于中小负载 , 这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用 。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上 。我们完全低估了流量 。
上面的方案在很多地方很不好 。没有办法控制我们产生的go routine的数量 。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了 。
再次尝试
我们需要找一个不同的方式 。自开始我们就讨论过 ,  我们需要保持请求处理程序的生命周期很短,并且进程在后台产生 。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上 , 不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题) 。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等 。这个列表会继续保留,因为有很多的方案可以实现这些 。
所以,第二次迭代 , 我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3 。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job , 我们认为只要把job在channel里面缓冲就可以了 。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue - payload
}
...
}
接下来,我们再从队列中取job,然后处理它们 。我们使用类似于下面的代码:
func StartProcessor() {
for {
select {
case job := -Queue:
job.payload.UploadToS3()// -- STILL NOT GOOD
}
}
}
说实话,我不知道我们在想什么 。这肯定是一个满是Red-Bulls的夜晚 。这个方法不会带来什么改善 , 我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了 。我们的同步处理器同时仅仅会上传一个数据到S3 , 因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力 。
我们仅仅避免了问题 , 同时开始了我们的系统挂掉的倒计时 。当部署了这个有缺陷的版本后 , 我们的延时保持在每分钟以常量增长 。

推荐阅读