Hadoop-mongodb 使用

一、概述

  • 使用hadoop-mongodb进行hdfs和mongodb之间的数据转换存储
org.mongodb.mongo-hadoop mongo-hadoop-core 1.5.1

二、案例
1、hdfs 到 mongodb
  • hdfs数据:
0000002215ca80c9adb0e1abab81630b|6bf3272e49c5481c3ce059311ecef91c

public class HadoopToMongoJob {public static void main(String[] args) throws Exception { fromHdfsToMongodb(); }public static void fromHdfsToMongodb() throws Exception {BasicConfigurator.configure(); String inputPath = "/user/test/input"; Configuration conf = new Configuration(); MongoConfigUtil.setOutputURI(conf, "mongodb://192.168.30.131:27017/test.test2"); String jobName = HadoopToMongoJob.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(HadoopToMongoJob.class); job.setMapperClass(MongoToHadoopMapper.class); // 如果setOutputKeyClass为NullWritable.class,插入mongodb的时候_id会自动生成 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BSONWritable.class); job.setNumReduceTasks(0); job.setOutputFormatClass(MongoOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputPath)); int jobStatus = job.waitForCompletion(true) ? 0 : 1; System.out.println(jobStatus); }public static class MongoToHadoopMapper extends Mapper { private Text k =new Text(); private BasicBSONObject v = new BasicBSONObject(); @Override protected void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\\|"); k.set(splits[0]); v.put("v", splits[1]); context.write(k,v); } } }

  • 结果:
> db.test2.find() { "_id" : "0000002215ca80c9adb0e1abab81630b", "v" : "6bf3272e49c5481c3ce059311ecef91c" } >


2、 mongodb 到 hdfs
  • mongodb数据
{ "_id" : "d1ec3fe3aa3b0ebdb37f2bf7cddf27dc", "v" : "a4ab0bfe090e016d67feb80676d826a8" }

public class HadoopToMongoJob {public static void main(String[] args) throws Exception { fromMongodbToHdfs(); }public static void fromMongodbToHdfs() throws Exception { BasicConfigurator.configure(); HDFSUtil.init("hdfs://192.168.30.150:8020"); String outputPath = "/user/test/output"; HDFSUtil.deleteIfExists(outputPath, true); Configuration conf = new Configuration(); MongoConfigUtil.setInputURI(conf, "mongodb://192.168.30.131:27017/test.test"); // true:表示读取mongodb中数据的时候,允许分片 // false: 表示读取mongodb中数据的时候不分片,全部由一个map来处理 MongoConfigUtil.setCreateInputSplits(conf, true); // 设置最大分片大小,单位M MongoConfigUtil.setSplitSize(conf, 1024); String jobName = HadoopToMongoJob.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); job.setJarByClass(HadoopToMongoJob.class); job.setMapperClass(HadoopToMongoMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); job.setInputFormatClass(MongoInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(outputPath)); int jobStatus = job.waitForCompletion(true) ? 0 : 1; System.out.println(jobStatus); }public static class HadoopToMongoMapper extends Mapper { private Text k = new Text(); @Override protected void map(Object key, BSONObject value, Mapper.Context context) throws IOException, InterruptedException { k.set(value.get("_id") + "|" + value.get("v")); context.write(k, NullWritable.get()); } } }

结果:
d1ec3fe3aa3b0ebdb37f2bf7cddf27dc|a4ab0bfe090e016d67feb80676d826a8

【Hadoop-mongodb 使用】

    推荐阅读