本文概述
- 介绍
- PostgreSQL数据源
- 使用Sqoop导入HDFS
- 问题或没有问题:你应该考虑使用HDFS
HDFS是一种可扩展的开源解决方案, 用于存储和处理大量数据。 HDFS已被证明在许多现代数据中心中都是可靠且高效的。
HDFS利用商用硬件以及开源软件来降低每字节存储的总成本。
凭借其内置的复制功能和对磁盘故障的恢复能力, HDFS是存储和处理数据以进行分析的理想系统。它不需要基础和开销来支持事务原子性, 一致性, 隔离性和持久性(ACID), 这是传统关系数据库系统所必需的。
此外, 与企业和商业数据库(例如Oracle)相比, 利用Hadoop作为分析平台可避免任何额外的许可成本。
【HDFS教程,适用于陷入关系数据库的数据分析师】许多人在初次了解HDFS时会提出的问题之一是:如何将现有数据放入HDFS?
在本文中, 我们将研究如何将数据从PostgreSQL数据库导入HDFS。我们将使用Apache Sqoop, 它是目前最有效的开源解决方案, 可在HDFS和关系数据库系统之间传输数据。 Apache Sqoop旨在将数据从关系数据库批量加载到HDFS(导入), 并将数据从HDFS批量写入关系数据库(导出)。
文章图片
通过将数据迁移到HDFS来加快分析速度。
鸣叫
本教程中的步骤是为具有执行SQL查询的基本知识和HDFS命令的基本知识的人员编写的。
使用的数据库系统是Windows的PostgreSQL 9.5, HDFS版本是Centos 6.4 Linux虚拟机上的Cloudera Hadoop 2.5.0-cdh5.2.0。
Apache Sqoop依赖于关系数据库供应商和数据库版本专用的JDBC驱动程序JAR文件。
要执行本文中显示的步骤, 用户将需要权限来远程连接到PostgreSQL数据库, 关系数据库上的SELECT权限, HDFS上的写权限以及Sqoop可执行文件上的执行权限。
在本教程中, 我们创建了一个名为srcmini的PostgreSQL数据库, 并使其可通过端口5432访问。
PostgreSQL数据源 首先, 在我们的PostgreSQL srcmini数据库中, 我们将创建一个名为sales的测试数据表。我们将假定PostgreSQL服务器上已经存在OpenSSL证书和私钥文件。
Server [localhost]:
Database [postgres]: srcmini
Port [5432]:
Username [postgres]:
Password for user postgres:
psql (9.5.3)
srcmini=# create table sales
srcmini-# (
srcmini(#pkSales integer constraint salesKey primary key, srcmini(#saleDate date, srcmini(#saleAmount money, srcmini(#orderID int not null, srcmini(#itemID int not null
srcmini(# );
CREATE TABLE
接下来, 我们将在表中插入20行:
srcmini=# insert into sales values (1, '2016-09-27', 1.23, 1, 1);
INSERT 0 1
srcmini=# insert into sales values (2, '2016-09-27', 2.34, 1, 2);
INSERT 0 1
srcmini=# insert into sales values (3, '2016-09-27', 1.23, 2, 1);
INSERT 0 1
srcmini=# insert into sales values (4, '2016-09-27', 2.34, 2, 2);
INSERT 0 1
srcmini=# insert into sales values (5, '2016-09-27', 3.45, 2, 3);
INSERT 0 1
srcmini=# insert into sales values (6, '2016-09-28', 3.45, 3, 3);
INSERT 0 1
srcmini=# insert into sales values (7, '2016-09-28', 4.56, 3, 4);
INSERT 0 1
srcmini=# insert into sales values (8, '2016-09-28', 5.67, 3, 5);
INSERT 0 1
srcmini=# insert into sales values (9, '2016-09-28', 1.23, 4, 1);
INSERT 0 1
srcmini=# insert into sales values (10, '2016-09-28', 1.23, 5, 1);
INSERT 0 1
srcmini=# insert into sales values (11, '2016-09-28', 1.23, 6, 1);
INSERT 0 1
srcmini=# insert into sales values (12, '2016-09-29', 1.23, 7, 1);
INSERT 0 1
srcmini=# insert into sales values (13, '2016-09-29', 2.34, 7, 2);
INSERT 0 1
srcmini=# insert into sales values (14, '2016-09-29', 3.45, 7, 3);
INSERT 0 1
srcmini=# insert into sales values (15, '2016-09-29', 4.56, 7, 4);
INSERT 0 1
srcmini=# insert into sales values (16, '2016-09-29', 5.67, 7, 5);
INSERT 0 1
srcmini=# insert into sales values (17, '2016-09-29', 6.78, 7, 6);
INSERT 0 1
srcmini=# insert into sales values (18, '2016-09-29', 7.89, 7, 7);
INSERT 0 1
srcmini=# insert into sales values (19, '2016-09-29', 7.89, 8, 7);
INSERT 0 1
srcmini=# insert into sales values (20, '2016-09-30', 1.23, 9, 1);
INSERT 0 1
让我们选择数据以验证数据看起来正确无误:
srcmini=# select * from sales;
pksales |saledate| saleamount | orderid | itemid
---------+------------+------------+---------+--------
1 | 2016-09-27 |$1.23 |1 |1
2 | 2016-09-27 |$2.34 |1 |2
3 | 2016-09-27 |$1.23 |2 |1
4 | 2016-09-27 |$2.34 |2 |2
5 | 2016-09-27 |$3.45 |2 |3
6 | 2016-09-28 |$3.45 |3 |3
7 | 2016-09-28 |$4.56 |3 |4
8 | 2016-09-28 |$5.67 |3 |5
9 | 2016-09-28 |$1.23 |4 |1
10 | 2016-09-28 |$1.23 |5 |1
11 | 2016-09-28 |$1.23 |6 |1
12 | 2016-09-29 |$1.23 |7 |1
13 | 2016-09-29 |$2.34 |7 |2
14 | 2016-09-29 |$3.45 |7 |3
15 | 2016-09-29 |$4.56 |7 |4
16 | 2016-09-29 |$5.67 |7 |5
17 | 2016-09-29 |$6.78 |7 |6
18 | 2016-09-29 |$7.89 |7 |7
19 | 2016-09-29 |$7.89 |8 |7
20 | 2016-09-30 |$1.23 |9 |1
(20 rows)
数据看起来不错, 让我们继续。
使用Sqoop导入HDFS 定义了数据源后, 我们现在准备将数据导入HDFS。我们将检查的sqoop命令在下面列出, 我们将在随后的要点中分解每个参数。请注意, 该命令应该位于完整的一行上, 或者如下图所示, 在每行的末尾加上反斜杠(Linux命令行继续字符), 最后一行除外。
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/srcmini?ssl=true&
sslfactory=org.postgresql.ssl.NonValidatingFactory' \
--username 'postgres' -P \
--table 'sales' \
--target-dir 'sales' \
--split-by 'pksales'
- sqoop导入-可执行文件名为sqoop, 我们指示其将表中的数据或视图从数据库中导入到HDFS。
- – connect-使用– connect参数, 我们传入PostgreSQL的JDBC连接字符串。在这种情况下, 我们使用IP地址, 端口号和数据库名称。我们还需要指定正在使用SSL, 并且需要提供要使用的SSLSocketFactory类。
- – username-在此示例中, 用户名是PostgreSQL登录名, 而不是Windows登录名。用户必须具有连接到指定数据库并从指定表中进行选择的权限。
- -P-这将提示命令行用户输入密码。如果很少执行Sqoop, 这可能是一个不错的选择。有多种其他方式可以将密码自动传递给命令, 但是本文尝试使密码保持简单。
- – table-这是我们传递PostgreSQL表名称的地方。
- – target-dir-此参数指定要在其中存储数据的HDFS目录。
- – split-by-我们必须为Sqoop提供唯一的标识符, 以帮助其分配工作量。在作业输出的后面, 我们将看到Sqoop在何处选择最小值和最大值以帮助设置分割边界。
[[email
protected]:/sqoop]$ cat sqoopCommand.sh
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/srcmini?ssl=true&
sslfactory=org.postgresql.ssl.NonValidatingFactory' \
--username 'postgres' -P \
--table 'sales' \
--target-dir 'sales' \
--split-by 'pksales'
[[email
protected]:/sqoop]$
现在, 该执行上面的Sqoop命令脚本了。 Sqoop命令的输出如下所示。
[[email
protected]:/sqoop]$ ./sqoopCommand.sh
16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0
Enter password:
16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000
16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation
16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1
16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce
Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar
16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql.
16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct
16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path.
16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales
16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation
16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales"
16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005
16/10/02 18:58:48 INFO mapred.JobClient:map 0% reduce 0%
16/10/02 18:59:04 INFO mapred.JobClient:map 50% reduce 0%
16/10/02 18:59:14 INFO mapred.JobClient:map 75% reduce 0%
16/10/02 18:59:15 INFO mapred.JobClient:map 100% reduce 0%
16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005
16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23
16/10/02 18:59:18 INFO mapred.JobClient:File System Counters
16/10/02 18:59:18 INFO mapred.JobClient:FILE: Number of bytes read=0
16/10/02 18:59:18 INFO mapred.JobClient:FILE: Number of bytes written=1190344
16/10/02 18:59:18 INFO mapred.JobClient:FILE: Number of read operations=0
16/10/02 18:59:18 INFO mapred.JobClient:FILE: Number of large read operations=0
16/10/02 18:59:18 INFO mapred.JobClient:FILE: Number of write operations=0
16/10/02 18:59:18 INFO mapred.JobClient:HDFS: Number of bytes read=438
16/10/02 18:59:18 INFO mapred.JobClient:HDFS: Number of bytes written=451
16/10/02 18:59:18 INFO mapred.JobClient:HDFS: Number of read operations=4
16/10/02 18:59:18 INFO mapred.JobClient:HDFS: Number of large read operations=0
16/10/02 18:59:18 INFO mapred.JobClient:HDFS: Number of write operations=4
16/10/02 18:59:18 INFO mapred.JobClient:Job Counters
16/10/02 18:59:18 INFO mapred.JobClient:Launched map tasks=4
16/10/02 18:59:18 INFO mapred.JobClient:Total time spent by all maps in occupied slots (ms)=48877
16/10/02 18:59:18 INFO mapred.JobClient:Total time spent by all reduces in occupied slots (ms)=0
16/10/02 18:59:18 INFO mapred.JobClient:Total time spent by all maps waiting after reserving slots (ms)=0
16/10/02 18:59:18 INFO mapred.JobClient:Total time spent by all reduces waiting after reserving slots (ms)=0
16/10/02 18:59:18 INFO mapred.JobClient:Map-Reduce Framework
16/10/02 18:59:18 INFO mapred.JobClient:Map input records=20
16/10/02 18:59:18 INFO mapred.JobClient:Map output records=20
16/10/02 18:59:18 INFO mapred.JobClient:Input split bytes=438
16/10/02 18:59:18 INFO mapred.JobClient:Spilled Records=0
16/10/02 18:59:18 INFO mapred.JobClient:CPU time spent (ms)=3980
16/10/02 18:59:18 INFO mapred.JobClient:Physical memory (bytes) snapshot=481574912
16/10/02 18:59:18 INFO mapred.JobClient:Virtual memory (bytes) snapshot=2949685248
16/10/02 18:59:18 INFO mapred.JobClient:Total committed heap usage (bytes)=127401984
16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec)
16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records.
[[email
protected]:/sqoop]$
注意, 上面输出的最后一行显示已检索到20条记录, 这与PostgreSQL数据库表中的20条记录相对应。
执行Sqoop命令后, 我们可以执行hdfs dfs -ls命令来查看默认情况下使用HDFS上的表名创建的目录。
[[email
protected]:/sqoop]$ hdfs dfs -ls
Found 1 items
drwxrwxrwx- srcmini data0 2016-10-02 18:59 sales
[[email
protected]:/sqoop]$
我们可以再次使用hdfs dfs -ls命令列出sales目录的内容。如果查看HDFS, 默认情况下会发现数据已分区并分布在四个文件中, 而不仅仅是包含在一个文件中。
[[email
protected]:/sqoop]$ hdfs dfs -ls sales
Found 6 items
-rw-rw-rw-1 srcmini data0 2016-10-02 18:59 sales/_SUCCESS
drwxrwxrwx- srcmini data0 2016-10-02 18:58 sales/_logs
-rw-rw-rw-1 srcmini data110 2016-10-02 18:59 sales/part-m-00000
-rw-rw-rw-1 srcmini data111 2016-10-02 18:59 sales/part-m-00001
-rw-rw-rw-1 srcmini data115 2016-10-02 18:59 sales/part-m-00002
-rw-rw-rw-1 srcmini data115 2016-10-02 18:59 sales/part-m-00003
[[email
protected]:/sqoop]$
hdfs dfs -cat命令将在HDFS上的销售数据的第一个分区中显示所有记录。
[[email
protected]:/sqoop]$ hdfs dfs -cat sales/part-m-00000
1, 2016-09-27, 1.23, 1, 1
2, 2016-09-27, 2.34, 1, 2
3, 2016-09-27, 1.23, 2, 1
4, 2016-09-27, 2.34, 2, 2
5, 2016-09-27, 3.45, 2, 3
[[email
protected]:/sqoop]$
请注意, 默认文件分隔符是逗号。此外, 请注意, 每个分区中只有五行, 因为源中的20行已均匀分布在四个分区中。
为了限制输出到屏幕的行数, 我们可以将cat命令的输出通过管道传递到head命令, 如下所示, 以检查其他三个分区的内容。
head命令的-n 5参数将屏幕输出限制为前五行。
(请注意, 在我们的例子中, 这是不必要的, 因为每个分区开始时只有五行。但是, 实际上, 每个分区中的行可能要比这多得多, 并且只需要检查前几行即可。确保它们看起来正确, 因此向你展示了如何操作。)
[[email
protected]:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5
6, 2016-09-28, 3.45, 3, 3
7, 2016-09-28, 4.56, 3, 4
8, 2016-09-28, 5.67, 3, 5
9, 2016-09-28, 1.23, 4, 1
10, 2016-09-28, 1.23, 5, 1
[[email
protected]:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5
11, 2016-09-28, 1.23, 6, 1
12, 2016-09-29, 1.23, 7, 1
13, 2016-09-29, 2.34, 7, 2
14, 2016-09-29, 3.45, 7, 3
15, 2016-09-29, 4.56, 7, 4
[[email
protected]:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5
16, 2016-09-29, 5.67, 7, 5
17, 2016-09-29, 6.78, 7, 6
18, 2016-09-29, 7.89, 7, 7
19, 2016-09-29, 7.89, 8, 7
20, 2016-09-30, 1.23, 9, 1
[[email
protected]:/sqoop]$
如果你需要运行查询以从PostgreSQL数据库中的多个表中提取数据, 则可以使用以下命令完成此操作:
[[email
protected]:/sqoop]$ cat sqoopCommand.sh
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/srcmini?ssl=true&
sslfactory=org.postgresql.ssl.NonValidatingFactory' \
--username 'postgres' -P \
--target-dir 'creditCardOrders' \
--split-by 'pksales' \
--query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS"
[[email
protected]:/sqoop]$
在上面的命令中, 我们对Sqoop命令使用了一些相同的参数, 但是当与SQL命令一起使用时, 它们具有不同的重要性。
- – target-dir-目标目录告诉Sqoop在HDFS上的哪个目录中存储所选数据。使用自由格式查询时, Sqoop需要此参数。
- – split-by-即使我们选择销售表的主键, 我们仍然必须为Sqoop提供唯一的标识符, 以帮助其分配工作量。
- – query-这是我们提供SQL查询的参数。上面的查询用双引号引起来。请注意, 包含查询的多行中没有反斜杠(行继续符)。还要注意WHERE子句末尾的和\ $ CONDITIONS。这是Sqoop所需的, 因为Sqoop将自动用唯一表达式替换$ CONDITIONS令牌。
借助此处学习的技能, 将数据从关系数据库系统导入HDFS是一个简单明了的过程, 只需一个命令即可完成。尽管这些示例的行数很少, 但是从PostgreSQL数据库表将大量数据导入HDFS的机制仍然相同。
你甚至可以尝试导入大型表和不同的存储定界符。使用Apache Sqoop比将数据库数据导出到文件, 将文件从数据库服务器传输到HDFS, 然后再将文件加载到HDFS更为有效。
相关:使用R提高数据处理能力
推荐阅读
- 迁移旧数据而不用担心
- 一致哈希指南
- 权威的NoSQL数据库指南
- 非传统数据存储的数据工程师指南
- SQL Server 2016始终加密(易于实现,难以破解)
- Apache Spark流教程(识别流行的Twitter Hashtags)
- SRVB密码系统入门
- 微服务入门(Dropwizard教程)
- 如何修复Windows更新错误0x8007000d(解决办法介绍)