oracle数据接入clickhouse
oracle数据接入clickhouse oracle数据接入包括两个方面:一是CDC,基于日志做数据变化的捕捉,包括增删改;二是增量数据的准实时导入,依赖于自增id或时间字段,相对于CDC而言部署较为简单,适用场景也仅适用于增量数据。在此仅介绍增量数据的接入。
一、clickhouse单机安装
- 升级OpenSSL
- rpm -Uvh openssl-1.0.2k-12.el7.x86_64.rpm
- 安装Unixodbc
- rpm -ivh unixODBC-2.3.1-11.el7.x86_64.rpm
- 安装clickhouse
- http://repo.yandex.ru/clickhouse/rpm/stable/x86_64/
- rpm -ivh clickhouse*
- http://repo.yandex.ru/clickhouse/rpm/stable/x86_64/
- flume安装
- (略)使用hdp中 Flume 1.5.2
- kafka安装
- (略)使用hdp中kafka 0.10.1
- flume-ng-sql-source
- 下载地址:https://github.com/keedio/flume-ng-sql-source.git,http://repo.red-soft.biz/repos/clickhouse/stable/el7/
- 编译:
- 为clickhouse写入方便,将默认分隔符由‘,’改为‘\t’;
- 以后可考虑增加json格式。
- 把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下
- oracle
- 建表
create table flume_ng_sql_source (
id varchar2(32) primary key,
msg varchar2(32),
createTime date not null
);
- 插入数据
insert into flume_ng_sql_source(id,msg,createTime) values('1','Test increment Data',to_date('2017-08-01 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('2','Test increment Data',to_date('2017-08-02 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('3','Test increment Data',to_date('2017-08-03 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('4','Test increment Data',to_date('2017-08-04 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('5','Test increment Data',to_date('2017-08-05 07:06:20','yyyy-mm-dd hh24:mi:ss'));
insert into flume_ng_sql_source(id,msg,createTime) values('6','Test increment Data',to_date('2017-08-06 07:06:20','yyyy-mm-dd hh24:mi:ss'));
commit;
- 把ojdbc6.jar放到flume的lib目录下
- 建表
- 新建flume-sql.conf
- 在/usr/local/flume目录新建flume-sql.conf
agentTest.channels = channelTest agentTest.sources = sourceTest agentTest.sinks = sinkTest###########sql source################## For each Test of the sources, the type is definedagentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g# Hibernate Database connection propertiesagentTest.sources.sourceTest.hibernate.connection.user = taizhou agentTest.sources.sourceTest.hibernate.connection.password = 123456 agentTest.sources.sourceTest.hibernate.connection.autocommit = true agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver agentTest.sources.sourceTest.run.query.delay=10000 agentTest.sources.sourceTest.enclose.by.quotes = falseagentTest.sources.sourceTest.status.file.path = /usr/local/flume agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status# Custom queryagentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20 agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASCagentTest.sources.sourceTest.batch.size = 1000 agentTest.sources.sourceTest.max.rows = 1000 agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agentTest.sources.sourceTest.hibernate.c3p0.min_size=1 agentTest.sources.sourceTest.hibernate.c3p0.max_size=10##############################agentTest.channels.channelTest.type = memory agentTest.channels.channelTest.capacity = 1000 agentTest.channels.channelTest.transactionCapacity = 1000 agentTest.channels.channelTest.byteCapacityBufferPercentage = 20 agentTest.channels.channelTest.byteCapacity = 1600000agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink agentTest.sinks.sinkTest.topic = test13 agentTest.sinks.sinkTest.brokerList = 10.8.7.85:6667 agentTest.sinks.sinkTest.requiredAcks = 1 agentTest.sinks.sinkTest.batchSize = 20 agentTest.sinks.sinkTest.channel = channelTestagentTest.sinks.sinkTest.channel = channelTest agentTest.sources.sourceTest.channels=channelTest
- 注意时间字段为第一个;
- $@$带引号,start.from值不需要引号,因其值填充$@$之后会有引号。
- 在/usr/local/flume目录新建flume-sql.conf
- 启动flume
- flume/bin目录下
- flume-ng agent –conf conf –conf-file /usr/local/flume/flume-sql.conf –name agentTest -Dflume.root.logger=INFO,console
- 查看是否有数据写入kafka
- kafka-console-consumer.sh –zookeeper localhost:2181 –topic TestTopic
- 数据为制表符分割,无引号
- flume/bin目录下
- 查看状态文件
- /usr/local/flume/agentTest.sqlSource.status
- 其中LastIndex 值即为最后导入的最大时间字段
- 若想从头重新导入,需把此文件删除
- /usr/local/flume/agentTest.sqlSource.status
- flume-clickhouse-sink
- https://reviews.apache.org/r/50692/diff/1#2
- 编译
- flume-clickhouse-sink-1.5.2.jar放到flume的lib目录下
- clickhouse建表
- 开放远程访问
- /etc/clickhouse-server/config.xml
::1 本机ip
clickhouse-client -m
CREATE TABLE flume_ng_sql_source ( createtime DateTime, id UInt32, msg String )engine = MergeTree PARTITION BY toYYYYMMDD(createtime) order by id SETTINGS index_granularity = 8192;
- 注意字段顺序同custom.query中相同
- DateTime与Date区别
- 开放远程访问
- 新建ch.conf
- 在/usr/local/flume目录新建ch.conf
agentTest.channels = channelTest agentTest.sources = sourceTest agentTest.sinks = sinkTest###########sql source################## For each Test of the sources, the type is definedagentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@10.8.7.96:1521/ora11g# Hibernate Database connection propertiesagentTest.sources.sourceTest.hibernate.connection.user = taizhou agentTest.sources.sourceTest.hibernate.connection.password = 123456 agentTest.sources.sourceTest.hibernate.connection.autocommit = true agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver agentTest.sources.sourceTest.run.query.delay=10000 agentTest.sources.sourceTest.enclose.by.quotes = falseagentTest.sources.sourceTest.status.file.path = /usr/local/flume agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status# Custom queryagentTest.sources.sourceTest.start.from = 2017-07-31 07:06:20 agentTest.sources.sourceTest.custom.query = SELECT TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS'),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE('$@$','YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASCagentTest.sources.sourceTest.batch.size = 1000 agentTest.sources.sourceTest.max.rows = 1000 agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider agentTest.sources.sourceTest.hibernate.c3p0.min_size=1 agentTest.sources.sourceTest.hibernate.c3p0.max_size=10##############################agentTest.channels.channelTest.type = memory agentTest.channels.channelTest.capacity = 1000 agentTest.channels.channelTest.transactionCapacity = 1000 agentTest.channels.channelTest.byteCapacityBufferPercentage = 20 agentTest.channels.channelTest.byteCapacity = 1600000agentTest.sinks.sinkTest.type = org.apache.flume.sink.clickhouse.ClickHouseSink agentTest.sinks.sinkTest.host = http://10.8.7.96 agentTest.sinks.sinkTest.port = 8123 agentTest.sinks.sinkTest.database = default agentTest.sinks.sinkTest.table = flume_ng_sql_source agentTest.sinks.sinkTest.batchSize = 3000 agentTest.sinks.sinkTest.format = TabSeparatedagentTest.sinks.sinkTest.channel = channelTest agentTest.sources.sourceTest.channels=channelTest
- 在/usr/local/flume目录新建ch.conf
- 启动flume
- flume/bin目录下
- flume-ng agent –conf conf –conf-file /usr/local/flume/ch.conf –name agentTest -Dflume.root.logger=INFO,console
- flume/bin目录下
- 查看clickhouse数据
- 查询数据:
- select * from flume_ng_sql_source order by id;
- 查看数据目录:
- /var/lib/clickhouse/data/default/flume_ng_sql_source/
- 每个分区一个目录
- 查询数据:
- 问题
- 当数据从oracle读取成功,而写入clickhouse失败时,状态文件中的lastindex值也会改变。
- 此流程是否应改为oracle–>flume–>kafka–>flume–>clickhouse
【oracle数据接入clickhouse】?
推荐阅读
- Docker应用:容器间通信与Mariadb数据库主从复制
- 使用协程爬取网页,计算网页数据大小
- Java|Java基础——数组
- Python数据分析(一)(Matplotlib使用)
- Jsr303做前端数据校验
- Spark|Spark 数据倾斜及其解决方案
- 数据库设计与优化
- 爬虫数据处理HTML转义字符
- 数据库总结语句
- oracle|oracle java jdk install