记一次调优过程—Spark读取OBS文件入ES

青春须早为,岂能长少年。这篇文章主要讲述记一次调优过程—Spark读取OBS文件入ES相关的知识,希望能为你提供帮助。
需求描述:读取OBS中的文件(文件格式为snappy压缩格式),解析数据,并将数据插入ES的过程。
数据规模:单个目录下的文件个数最大上限为500个,每个文件压缩后为115MB,压缩比为5.5,每条数据大小为600B-1KB。
目录格式:/data/xxx/userId/yyyyMMdd
存入ES对应索引:xxx-userId-yyyyMMdd
实现过程 1. 初始化
3. 存入ES
具体插入操作:

public void batchPutJson(String index, List< String> jsonList) throws EsOperateException try RestHighLevelClient highClient = getHighLevelClient(); BulkRequest bulkRequest = new BulkRequest(); if (jsonList == null) return; int num = 0; for (String data: jsonList) num++; if (num % BATCH_SIZE < = 0) highClient.bulk(bulkRequest, RequestOptions.DEFAULT); bulkRequest = new BulkRequest(); else bulkRequest.add(new IndexRequest(index.toLowerCase(Locale.ROOT)).source(data, XContentType.JSON)); // 如果刚好bulkRequest没有请求,则不需要执行插入操作 if (bulkRequest.numberOfActions() > 0) // 避免异常:ActionRequestValidationException: Validation Failed: 1: no requests added highClient.bulk(bulkRequest, RequestOptions.DEFAULT); catch (IOException | EsConnectException ex) log.error("batch put json failed: ", ex); throw new EsOperateException("batch put exception: " + ex.getMessage());

测试过程 1. ES模板事先已创建好
记一次调优过程—Spark读取OBS文件入ES

文章图片

副本数为1,分片数为5,刷新间隔为10s
2. 索引为插入数据时自动创建
3. OBS文件
文件个数6个,单个大小115MB,数据量大约为666万条
记一次调优过程—Spark读取OBS文件入ES

文章图片

4. 测试结果
耗时为整个任务的耗时,任务包含其他初始化过程和任务停止过程
  1. 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时14min;
  2. 不设置Repartition,1个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入4w条,总耗时17min;
  3. 不设置Repartition,2个Executor,ExecutorCPU为1U,Executor内存为2G,ES单次插入1w条,总耗时10min;
  4. 不设置Repartition,5个Executor,ExecutorCPU为1U,Executor内存为1G,ES单次插入2w条,总耗时6.5min; 优化过程看着这个耗时时间,不禁想到,如果按照最大规格,那不是得等死。。。
    开始了优化之路
    1. 任务耗时分析
    记一次调优过程—Spark读取OBS文件入ES

    文章图片

    发现任务的最大耗时之处在于这一句:
    dataset.toJSON().tojavaRDD() .mapPartitions(new ColdToHotFunction(this.owner, date)) .foreach((VoidFunction< String> ) str -> log.info("data to es result is: ", str));

    都知道,文件读取结果dataset,真正有结果是在这一句,所以这一句包含两个步骤,1是读取文件,2是存入ES,那到底是哪一步耗时呢?
    记一次调优过程—Spark读取OBS文件入ES

    文章图片

    通过进入任务详情查看,还是不太确定耗时大的真正原因。
    2. 尝试
    认为是读文件和数据转化过程慢:
    将Repartition设置为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,最后总耗时还是需要6min左右,并没有实质性提高。
    Executor数量 * Executor CPU > = Repartition时才能保证单次任务可以充分执行。
既然不是文件读取,那就是插入ES慢?(枉我一开始这么信任ES)
增加插入耗时打印:
记一次调优过程—Spark读取OBS文件入ES

文章图片

emmm... 原来就是你啊,每次插入2w条,这么慢的嘛!!!
3. 优化
ES服务端已经没法改变了,那只能从业务侧进行优化了
我已经采用了批量插入,还有什么优化策略呢???
最终通过以下两点:
  1. 一开始刷新间隔时间为10s,改为刷新间隔60s,因为没有必要刷新那么频繁;
  2. 原来是在插入数据的时候自动创建的索引,改为插入数据前,先手动创建索引,且设置副本数为0,在整个过程结束之后,再将副本数更新为1。因为副本数会导致每次插入数据会插多遍,很影响速率。 优化结果检查经过上面的优化后,再测试一下性能:
  3. Repartition设为10,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时4min;
  4. Repartition设为6,3个Executor,ExecutorCPU为2U,Executor内存为1G,ES单次插入2w条,总耗时3.6min;
    记一次调优过程—Spark读取OBS文件入ES

    文章图片

    记一次调优过程—Spark读取OBS文件入ES

    文章图片

    此种方式,在数据量越大的时候,效果应该就越明显了。
    出于公司资源有限的情况考虑,Executor数量 * Executor CPU > = Repartition时能最大发挥多线程优势,所以采用了第二种方式。
心得【记一次调优过程—Spark读取OBS文件入ES】此次调优过程看了很多优化方面的博客,因为任务所做的事情简单,没有涉及到太多的可优化点,但是感觉还有很大的优化空间。
作为一个小菜鸡,能通过自己慢慢摸索,测试完成效率提升4倍,也是很开心的事情!!!
如果大佬们有好的建议,希望留言,我好继续进行优化,感谢!!!

    推荐阅读