mongodb数据迁移到hbase

mongodb数据迁移到hbase

  • 导入包
# encoding: utf-8 ''' @author: zcc @license: (C) Copyright 2013-2017, Node Supply Chain Manager Corporation Limited. @software: pycharm @file: ggsn_to_hbase.py @time: 9/1/17 2:43 PM @desc: ''' from thrift.transport import TSocket, TTransport from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo from hbase.ttypes import IOError, AlreadyExists from hbase import Hbase from hbase.ttypes import *

  • 操作hbase的类
import struct def encode(n): return struct.pack("i", n)class HbaseControl(object): def __init__(self, table, col_name, host='192.168.1.10', port=9090): self.table = table self.host = host self.port = port# Connect to HBase Thrift server self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port)) self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)# Create and open the client connection self.client = Hbase.Client(self.protocol) self.transport.open()# set type and field of column families self.set_column_families(col_name) self._build_column_families()def set_column_families(self, col_list=['name', 'sex', 'age']): ''' 设置每列名称和属性 :param self: :param type_list: :param col_list: :return: ''' self.columnFamilies = col_listdef _build_column_families(self): ''' 如果hbase中没有当前表,则建立 :param self: :return: ''' tables = self.client.getTableNames() if self.table not in tables: self.__create_table(self.table)def __create_table(self, table): ''' 在hbase中建表 :param self: :param table: :return: ''' columnFamilies = [] for columnFamily in self.columnFamilies: name = Hbase.ColumnDescriptor(name=columnFamily) columnFamilies.append(name) self.client.createTable(table, columnFamilies)def del_row(self, row_key): ''' 删除行 :param row_key: :return: ''' self.client.deleteAllRow(self.table, row_key, {})def __del__(self): ''' 销毁对象前关闭hbase链接 :return: ''' self.transport.close()def _del_table(self, table): ''' 删除hbase中的表 :param table: :return: ''' self.client.disableTable(table) self.client.deleteTable(table)def getColumnDescriptors(self): ''' 获取hbase表的列簇描述 :return: ''' return self.client.getColumnDescriptors(self.table)def put(self, record, day): ''' 向hbase中插入一条记录 :param record: :return: ''' assert isinstance(record, dict) mutations = [] #tel和日期构成hbase内的行名 row_key = '{0}_{1}'.format(record['tel'], day) #插入tel mutations.append(Hbase.Mutation(column='baseinfo:tel', value=https://www.it610.com/article/str(record['tel']))) #插入day mutations.append(Hbase.Mutation(column='baseinfo:day', value=https://www.it610.com/article/str(day))) #插入suminfo mutations.append(Hbase.Mutation(column='suminfo:context', value=https://www.it610.com/article/str(record['suminfo']))) self.client.mutateRow(self.table, row_key, mutations, {})def puts(self, records, day): ''' hbase批量插入 :param records: :param day: :return: ''' assert isinstance(records, list)mutationsBatch = [] for record in records: mutations = [] #tel和日期构成hbase内的行名 row_key = '{0}_{1}'.format(record['tel'], day) #插入tel mutations.append(Hbase.Mutation(column='baseinfo:tel', value=https://www.it610.com/article/str(record['tel']))) #插入day mutations.append(Hbase.Mutation(column='baseinfo:day', value=https://www.it610.com/article/str(day))) # 插入suminfo mutations.append(Hbase.Mutation(column='suminfo:context', value=https://www.it610.com/article/str(record['suminfo'])))mutationsBatch.append(Hbase.BatchMutation(row=row_key, mutations=mutations)) self.client.mutateRows(self.table, mutationsBatch, {})

  • 操作mongodb且到将数据导入到hbase的类
from pymongo import MongoClient class MongDBControl(object): def __init__(self, table_name, host='192.168.1.20', port=27017): self.client = MongoClient(host, port) db = self.client.table self.collect = db[table_name] self.table = table_namedef __del__(self): self.client.close()def record_to_hbase(self, hc): assert isinstance(hc, HbaseControl)num = 0while True: records = self.collect.find().skip(1000*num).limit(1000) if not records: break hc.puts(list(records), self.table) num += 1 print '已经从mongodb向hbase导入{0}条数据!!'.format(num*1000) print '数据迁移完毕!!!'

  • 主函数
if __name__ == '__main__': if 1: hc = HbaseControl(table='table', col_name=['baseinfo', 'count', 'suminfo', 'nodebll', 'nodebzl']) mc = MongDBControl('20170806') # mc.record_to_hbase(hc) hc._del_table('table')

【mongodb数据迁移到hbase】转载于:https://www.cnblogs.com/crazysquirrel/p/7471641.html

    推荐阅读