环境
- postgres
- cdh6.3.2
- 从pg从导出数据写入到hdfs上. 写入时使用lzo对数据进行压缩
Invalid LZO header
. 但是在linux上lzo 命令压缩文件然后上传到hdfs是可以的.思考是lzo是不支持流式压缩的.
解决 通过看hadoop压缩类
CompressionOutputStream
发现其子类有BlockCompressorStream
注释说它与基于“基于块”的压缩算法一起工作,而不是*基于流的压缩算法。
在通过看com.hadoop.compression.lzo.LzoCodec#createOutputStream(java.io.OutputStream, org.apache.hadoop.io.compress.Compressor)
public CompressionOutputStream createOutputStream(OutputStream out, Compressor compressor) throws IOException {
if (!isNativeLzoLoaded(this.conf)) {
throw new RuntimeException("native-lzo library not available");
} else {
CompressionStrategy strategy = getCompressionStrategy(this.conf);
int bufferSize = getBufferSize(this.conf);
int compressionOverhead = strategy.name().contains("LZO1") ? (bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3;
return new BlockCompressorStream(out, compressor, bufferSize, compressionOverhead);
}
}
发现其创建的
BlockCompressorStream
如果是流式写入的话是不支持的.然后在看了
com.hadoop.compression.lzo.LzopCodec
public CompressionOutputStream createIndexedOutputStream(OutputStream out, DataOutputStream indexOut, Compressor compressor) throws IOException {
if (!isNativeLzoLoaded(this.getConf())) {
throw new RuntimeException("native-lzo library not available");
} else {
CompressionStrategy strategy = CompressionStrategy.valueOf(this.getConf().get("io.compression.codec.lzo.compressor", CompressionStrategy.LZO1X_1.name()));
int bufferSize = this.getConf().getInt("io.compression.codec.lzo.buffersize", 262144);
return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy);
}
}
是创建的
LzopOutputStream
支持流式压缩. 具体为public HDFSFileApplier(Table table, SyncConfig syncConfig) {
super(table, syncConfig.getSync().getApplier());
this.fullName = table.getFullName();
//初始化Configuration//初始化outputStream
LzopCodec lzoCodec = new LzopCodec();
lzoCodec.setConf(config);
try {
FileSystem fs = FileSystem.get(config);
Path path = new Path("/user/xxx/" + table.getSchema() + "/" + table.getName() + ".lzo");
FSDataOutputStream fos;
if (fs.exists(path)) {
fos = fs.append(path);
} else {
fos = fs.create(path);
}
CompressionOutputStream cos = lzoCodec.createOutputStream(fos);
} catch (IOException e) {
log.error("获取FileSystem 失败.", e);
}
}
【HBase|Hadoop使用Lzo 压缩时需要注意的点】此方式写入的文件是可以被index的.
参考
- http://www.51niux.com/?id=178
- https://wzktravel.github.io/2016/09/19/hadoop-lzo/
推荐阅读
- HBase的 rowkey 设计原则
- HBase 数据存储结构详细解读
- hadoop|Import/Export实现hbase集群间迁移
- HBase数据Export & Import
- 实战大数据,HBase 性能调优指南
- HBase tableExists() 作用就是判断某个表是否存在
- java hbase ddl alter 添加新的列族
- hbase|Hbase数据导出实例
- 大数据|HBase导出CSV格式数据的方法
- Hbase数据的导入导出以及优化