HBase|Hadoop使用Lzo 压缩时需要注意的点

环境

  • postgres
  • cdh6.3.2
  • 从pg从导出数据写入到hdfs上. 写入时使用lzo对数据进行压缩
遇到的问题 写入可以成功.但是使用lzo的库创建索引提示Invalid LZO header . 但是在linux上lzo 命令压缩文件然后上传到hdfs是可以的.
思考是lzo是不支持流式压缩的.
解决 通过看hadoop压缩类CompressionOutputStream 发现其子类有BlockCompressorStream 注释说它与基于“基于块”的压缩算法一起工作,而不是*基于流的压缩算法。 在通过看com.hadoop.compression.lzo.LzoCodec#createOutputStream(java.io.OutputStream, org.apache.hadoop.io.compress.Compressor)
public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException { if (!isNativeLzoLoaded(this.conf)) { throw new RuntimeException("native-lzo library not available"); } else { CompressionStrategy strategy = getCompressionStrategy(this.conf); int bufferSize = getBufferSize(this.conf); int compressionOverhead = strategy.name().contains("LZO1") ? (bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3; return new BlockCompressorStream(out, compressor, bufferSize, compressionOverhead); } }

发现其创建的BlockCompressorStream 如果是流式写入的话是不支持的.
然后在看了com.hadoop.compression.lzo.LzopCodec
public CompressionOutputStream createIndexedOutputStream(OutputStream out, DataOutputStream indexOut, Compressor compressor) throws IOException { if (!isNativeLzoLoaded(this.getConf())) { throw new RuntimeException("native-lzo library not available"); } else { CompressionStrategy strategy = CompressionStrategy.valueOf(this.getConf().get("io.compression.codec.lzo.compressor", CompressionStrategy.LZO1X_1.name())); int bufferSize = this.getConf().getInt("io.compression.codec.lzo.buffersize", 262144); return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy); } }

是创建的LzopOutputStream支持流式压缩. 具体为
public HDFSFileApplier(Table table, SyncConfig syncConfig) { super(table, syncConfig.getSync().getApplier()); this.fullName = table.getFullName(); //初始化Configuration//初始化outputStream LzopCodec lzoCodec = new LzopCodec(); lzoCodec.setConf(config); try { FileSystem fs = FileSystem.get(config); Path path = new Path("/user/xxx/" + table.getSchema() + "/" + table.getName() + ".lzo"); FSDataOutputStream fos; if (fs.exists(path)) { fos = fs.append(path); } else { fos = fs.create(path); } CompressionOutputStream cos = lzoCodec.createOutputStream(fos); } catch (IOException e) { log.error("获取FileSystem 失败.", e); } }

【HBase|Hadoop使用Lzo 压缩时需要注意的点】此方式写入的文件是可以被index的.
参考
  • http://www.51niux.com/?id=178
  • https://wzktravel.github.io/2016/09/19/hadoop-lzo/

    推荐阅读