目录
大数据概述
Hadoop大数据开发平台
资源管理YARN
分布式文件系统HDFS
非关系型数据库NOSQL
分布式数据库HBASE
批处理和MapReduce
数据仓库查询分析和Hive
基于内存计算的Spark
流计算和Flink
图计算和PREGEL
Hadoop常用命令总结
大数据概述 大数据的4V:大量化、快速化、多样化、价值密度低。
大数据对思维方式的影响:颠覆了传统的思维方式——全样而非抽样、效率而非精确、相关而非因果
大数据对科学研究的影响:实验、理论、计算、数据
三次信息化浪潮
第一次——1980——个人计算机为标志——解决信息处理——Intel、AMD、IBM
第二次——1995——互联网——信息传输——雅虎、谷歌
第三次——2010——物联网、云计算和大数据——信息爆炸——亚马逊、美团
信息科技为大数据提供的技术
- 存储设备容量增加、成本降低
- CPU性能提升
- 网络带宽增加、终端数目增加
运营式系统阶段、用户原创内容阶段、感知式系统阶段。
大数据发展三个阶段
萌芽期(第一):20世纪90年代至21世纪初——随着数据挖掘理论和数据库技术的逐步成熟,一批商业智能工具和知识管理技术开始被应用。如数据仓库、专家系统、知识管理系统等。
成熟期(第二):21世纪第一个十年——Web2.0应用迅速发展,非结构化数据大量产生,传统处理方法难以应对,带动了大数据技术的快速突破,大数据解决方案逐渐走向成熟,GFS和MapReduce等大数据技术受到追捧,Hadoop平台开始崭露头角。
大规模应用期(第三):2010年以后——大数据应用渗透各行各业,数据驱动决策,信息社会智能化程度大幅提高。
Hadoop大数据开发平台 谷歌2004年“三驾马车”处理海量数据问题:GFS分布式文件系统、MapReduce大数据分布式计算框架、NoSQL数据库系统BigTable
大数据两个核心技术:分布式存储、分布式处理
分布式存储
- 文件系统:HDFS
- NoSQL:HBase、MongoDB
- 消息系统:Kafka
- 批处理计算:MapReduce、Spark
- 流计算:Storm,Flink
- 图计算:Pregel
- 查询分析计算:Hive、Impala
Hadoop****特性:高扩展、高效性、高可靠、高容错、成本低。
Hadoop****生态:
- Zookeeper:分布式协调服务
- Hbase:分布式数据库
- Ambari:安装部署工具
- Oozie:作业流调度系统
- MapReduce:离线计算
- Tez:DAG计算
- Spark:内存计算
- yean:资源调度管理
- HDFS:分布式存储系统
- Sqoop:数据库TEL工具
- Flume:日志收集
单机模式:一台机器上运行。(真正单机)
伪分布式模式:一台机器上模拟一个小集群,依赖SSH,需要初始化文件系统,本地的input文件夹和HDFS的input文件夹都在同一台机器上,并不需要通过网络传输数据。(单机装多机)
完全分布式模式:存储采用分布式文件系统HDFS,而且HDFS的名称结点和数据结点位于不同机器上。(真正多机)
伪分布式安装
- Hadoop进程可以分离的多个Java进程来运行
- 单结点,既作为NameNode也作为DataNode
- Hadoop配置文件位于/uhadoop/etc/hadoop/中,伪分布式需要修改配置文件core-site.xml和hdfs-site.xml
- Hadoop的配置文件是xml格式,每个配置以声明property的name和value来实现
伪分布式用途:常用于调试程序
Hadoop****的版本
Hadoop2.0****三大主要部分:HDFS、MapReduce、yarn。其中HDFS包括NN Federation和HA;MapReduce运行于Yean之上。
1.0****到2.0版本差异:
文章图片
资源管理YARN yarn——2.0的资源调度框架
MapReduce1.0既是一个计算框架,也是一个资源管理调度框架。到了Hadoop2.0后,其资源调度功能被分离形成Yarn,而被剥离了资源调度功能的MapReduce1.0变为2.0,只拥有计算功能。
总结:Yarn是纯粹的资源调度框架,MR2.0是纯粹的计算框架。
yarn的调度策略
- 先进先出——队列
- 容器——多队列——资源使用量小、优先级高的先执行;最大化吞吐量和利用率
- 公平——多队列——公平调度算法,支持资源抢占,确保平均而言所有作业获得等量的资源
yarn好处
- yarn为这些计算框架提供统一的资源调度管理服务,并且能够根据各种计算框架的负载需求,调整各自占用的资源,实现集群资源共享和资源弹性收缩。
- 其可以实现一个集群上的不同应用负载混搭,有效提高了集群的利用率
- 不同计算框架可以共享底层存储,避免了数据集跨集群移动
分布式文件系统指通过网络实现文件在多台主机上进行分布式存储的文件系统,一般采用“客户机/服务器”(CS)模式,客户端以特定的通信协议通过网络与服务器建立连接,提出文件访问请求,如GFS和HDFS。
注:分布式文件系统是大集合,HDFS是子集。
HDFS目标
- 兼容廉价的硬件设备
- 流数据读写
- 大数据集
- 简单的文件模型
- 强大的跨平台兼容性
- 不适合低延迟数据访问
- 无法高效存储大量小文件
- 不支持多用户写入及任意修改文件
- 加快数据传输速度
- 很容易检查数据错误
- 保证数据可靠性
块(HDFS的核心概念):HDFS默认一个块64MB,一个文件被分成多个块,以块作为存储单位。
名称结点(NameNode):负责管理分布式文件系统的命名空间,用两个文件保存了两个核心的数据结构(FSImage和EditLog)。
数据结点(DataNode):负责数据的存储和读取,会根据客户端或者是名称结点的调度来进行数据的存储和检索,并且向名称结点定期发送自己所存储的块的列表
第二名称结点(****SecondaryNamenode):用来保存名称结点对HDFS元数据信息的备份,并减少名称结点重启的时间。
注:一个机架上可以放一个名称节点、多个数据节点。
拓展:
块的默认大小是64MB,但是也可以128MB。HDFS中的块比一般普通文件系统的块大很多。之所以设计成一块一块是因为HDFS是面向大规模数据存储,且降低分布式节点的寻址开销。但是块不是越大越好,如果块过大会导致MapReduce才执行一两个任务,这样牺牲了其并行度,发挥不了分布式并行处理的效果。
名称节点也叫主节点。它是整个HDFS集群的管家,可以理解为是数据库中的数据目录。而数据节点才是存储真实数据即元数据。
FSImage用于保存系统文件树(如文件的复制等级、修改和访问时间、访问权限、块大小以及组成文件的块等)。EditLog用于记录对数据进行的操作。
名称节点若出错则根据第二名称结点备份。
名称节点管家会定期检查数据节点是否坏掉,如坏掉则标志位宕机,然后将坏掉的数据节点中的数据迁移到另外一个数据节点上。这种做法有时也可以解决负载均衡问题。
总结:HDFS用块存文件内容,名称结点做管家只有通知功能不具备亲自上手功能,数据节点相当于工人真正在干活,管家中的FSImage用于存储信息在块的位置,EditLog记录操作,EditLog做记录肯定不断变大,第二名称结点则作为备份工人和垃圾回收工人,定期处理不断变大的EditLog。
HDFS如何减轻中心结点的负担?
当客户端需要访问一个文件时,首先把文件名发送给名称结点,名称结点根据文件名找到对应的数据块(一个文件可能包括多个数据块),再根据每个数据块信息找到实际存储各个数据库的数据节点的位置,并把数据节点位置发送给客户端,最后客户端直接访问这些数据节点获取数据,在整个访问过程中,名称节点并不参与数据的传输。名称节点启动成功并进入正常运行状态以后,HDFS的更新操作都会被写入到EditLog,而不是直接写入FSImage。第二名称结点可以完成EditLog与FSImage的合并操作,减小EditLog文件大小,缩短名称结点重启时间。
HDFS对于冗余数据的保存
HDFS默认的冗余复制因子是3。其中,有两份副本放在同一个机架的不同机器上面,第三个副本放在不同机架的机器上面,这样既可以保证机架发送异常时的数据恢复,也可以提高数据读写性能。一般而言,如果是在集群内发起写操作请求,则把第一个副本放置在发起写操作请求的数据结点上,实现就近写数据。如果是来自集群外部的写操作请求,则从集群内部挑选一台磁盘不太慢,CPU不太忙的数据结点作为第一个副本的存放地。
非关系型数据库NOSQL 关系数据库和NoSQL(非关系数据库)的比较
关系数据库
- 优势:以完善的关系代数理论作为基础,有严格的标准,支持ACID四大特性,借助索引机制可以实现高效的查询,技术成熟,有专业公司的技术支持。
- 劣势:可扩展性差,无法较好支持海量数据存储,数据规模过于死板,无法较好支持Web2.0应用,事务机制影响了系统的整体性能等。
- 优势:可以支持超大规模的数据存储,灵活的数据模型可以很好地支持Web2.0应用,具有强大的横向扩展能力等。
- 劣势:缺乏数据理论支持,复杂查询性能不高,大都不能实现事务强一致性,很难实现数据完整性,技术尚不成熟,缺乏专业团队的技术支持,维护较困难等。
关系数据库应用场景:电信银行等领域的关键业务系统,需要保证强事务一致性。
NOSQL数据库应用场景:互联网企业、传统企业的非关键业务。
采用混合架构
- 亚马逊公司使用不同类型的数据库来支撑它的电子商务应用。
- 对于购物篮这种临时性数据,采用键值存储会更加高效
- 当前的产品和订单信息则适合存储在关系数据库中
- 大量的历史订单信息则适合保存在类似MongoDB这类文档数据库中。
文档数据库:以文档为数据库的最小单位,对文档以某种标准化格式封装,每个文档可能具有完全不同的结构,具有基于文档内容的索引和查询能力。如mongoDB。
图数据库:使用图作为数据模型来存储数据,可以高效地存储不同顶点之间的关系,专门用于处理具有高度相互关联关系的数据,可以高效地处理实体之间的关系。如InfiniteGraph。
键值数据库:使用键定位值,值对数据库而言是透明不可见的,不能对值进行索引和查询,只能通过键进行查询。如Redis。
列族数据库:采用列族数据模型,数据库由多个行构成,每行数据包含多个列族,不同的行可以具有不同数量的列族,属于同一列族的数据会被存放在一起。如HBase。
拓展:MongoDB
MongoDB简介
MongoDB是由C++语言编写的,是一个基于分布式文件存储的开源数据库系统,在高负载的情况下,添加更多的节点,可以保证服务器性能。MongoDB旨在为WEB应用提供可扩展的高性能数据存储解决方案。
MongoDB特点
- 提供了一个面向文档存储,操作起来比较简单和容易
- 可以设置任何属性的索引,实现更快的排序
- 具有较好的水平可扩展性
- 支持丰富的查询表达式,可查询文档中内嵌的对象及数组
- 可体会已完成文档某个指定的数据字段
- 安装非常简单
- MongDB中的MR主要是对数据进行批量处理和聚合操作
三大基石:CAP、BASE、最终一致性
CAP:CAP指的是Consistency一致性、Availability可用性、Partition Tolerance分区容错率。CA最简单的做法是把所有的事务放在同一台机器上,但这种做法会严重影响系统的可扩展性。CP当出现网络分区的情况时,受影响的服务需要等待数据一致,因此在等待期间就无法对外提供服务。AP允许系统返回不一致的数据。
BASE:并非表示“基础”。而是指Basically Available、Soft state、Eventual consistency。其中Basically Available表示基本可用(一个分布式系统的一部分发生问题变得不可用时,其他部分仍然可以使用,允许分区失败的情形出现)。Soft state表示软状态(和一致性相反,状态可以有一段时间不同步,具有一定的滞后性)。Eventual consistency表示最终一致性(后续的访问操作可能暂时读不到更新后的数据,但最终必须能读到)。
拓展:事务的ACID四大特性
Atomicity原子性:事务必须是不可再分的,要么全执行,要么不执行。
Consistency一致性(硬状态):所有的数据都应该在事务执行前后保持一致。
Isolation隔离性:事务之间互不影响
Durability持久性:事务完成之后对系统的影响是持久性的,即使发生故障。
最终一致性
根据更新数据后各进程访问到数据的时间和方式的不同,可以区分为以下几种:
- 因果一致性:如果进程A通知进程B它已经更新了一个数据项,那么进程B的后续访问将获得A写入的最新值。
- “读己之所写”一致性:当进程A自己执行一个更新操作后,它自己总是可以访问自己更新过的值,不会看到旧值。
- 单调读一致性:如果进程已经看到过数据对象的某个值,那么任何后续访问,都不会返回在那个值之前的旧值。
- 会话一致性:它会把访问存储系统的这些进程放到会话的上下文进程当中,这时只要这些会话存储,系统就可以保证读己之所写一致性。
- 单调写一致性:系统需要保证来自同一个进程的写操作按顺序执行。
HBase是一个高可靠、高性能(可以支持PB级别的数据)、面向列、可伸缩的分布式数据库,是谷歌BigTable的开源实现,主要用来存储非结构化和半结构化的松散数据。HBase的目标是处理非常庞大的表,可以通过水平扩展的方式,利用廉价计算机集群处理由超过10亿行数据和数百万列元素组成的数据表,其运行在HDFS或Alluxio(读音:/a’la’so/)之上。
拓展:BigTable
其架构于GFS之上,使用MapReduce作为数据处理,使用Chubby作为协同管理服务。
而HBase架构于HDFS之上,使用Hadoop MapReduce作为数据处理,使用Zookeeper作为协同管理服务。
Hbase和传统关系数据库的对比分析
HBase的出现原因:虽然已经有了HDFS和MapReduce,但是Hadoop主要解决大规模数据离线批量数据,没法满足大数据实时处理。
关系数据库:多种数据类型,使用传统的关系数据模型,非常多的数据操作,支持多表连接,基于行存储,可以构建多个索引提高查询效率,更新操作会覆盖旧值,很难实现横线扩展和纵向扩展。
HBase:只有字符串类型,有多种操作,但是要避免多表连接(表中数据过多,若多表连接时间复杂度很高),基于列存储,只有行键索引,更新时生成新版本保留旧版本,可以轻易在集群中增加或者减少硬件数量来实现性能的压缩。
Hbase数据模型
表:Hbase采用表来组织数据,表由行和列组成,列划分为若干个列族。
行:每个Hbase表由若干行组成,每个行有一个行键。
列族:一个Hbase表被分组成许多列族的集合,它是基本的访问控制单元。
列限定符(列):列族里的数据通过列来定位。
单元格:在Hbase表中,通过行、列族和列限定符确定一个单元格,单元格存储的数据类型被视为字节数组byte[]。
时间戳:每个单元格都保存着同一份数据的多个版本,这些版本用时间戳进行索引。
文章图片
总结:一言蔽之,行键确定行,列族确定大概方位,列确定具体列的位置,上面三者所确定的具体位置即为单元格,单元格可以多版本,确定版本可以用时间戳。
批处理和MapReduce 分布式并行编程
【面试|Hadoop总结】批处理计算:解决针对大规模数据的批量处理需求,MapReduce是最具有代表性和影响力的大数据批处理技术,用于大规模数据集的并行运算。
MR设计理念:计算向数据靠拢而非数据向计算靠拢(要完成一次数据分析时,选择一个计算节点,把运行数据分析的程序放到计算节点上运行,然后把它涉及的数据,全部从各个不同节点上面拉过来,传输到计算发生的地方)。
传统并行计算框架:使用共享内存并行计算模型,容错性差;使用刀片服务器、高速网、SAN、价格贵、扩展性差;编程难度高;适用于实时细粒度计算,属于计算密集型。
MR:使用非共享式并行计算模型,容错性好;普通PC机即可并行,扩展性好;编程简单;适用于非实时批处理计算,属于数据密集型。
扩展:MapReduce策略
其采用分而治之的策略,将非常大的数据集切分为非常多的独立的小分片,然后为每一个分片单独地启动一个map任务,最终通过多个map任务,并行地在多个机器上去处理。
Split
MR基本处理单位为Split。Split是为逻辑概念,只记录数据元信息,划分数据为多少个Split由用户自己决定。
扩展:MapReduce架构
MR采用Master/slave架构。MR中带有一个Master服务器和多个slave服务器,Master服务器带有一个作业跟踪器JobTracker,用于负责整个作业的调度和处理以及失败和恢复,而slave服务器带有负责具体任务执行的组件TaskTracker,TaskTracker主要负责接收JobTracker给它发的作业处理指令完成具体的任务处理。
文章图片
如上,用户可以通过Client用户端提交用户编写的应用程序(也可以查看当前提交作业的运行状态),而后用户端提交作业给作业跟踪器,作业跟踪器指明作业的分配后,将作业交给TaskTracker去落实这个分配计划,而作业跟踪器则监督其是否落实。
Map和Reduce
MapReduce的任务被抽象为两个函数:Map和Reduce。其中Map的功能是将一个键值对输出分为一堆的键值对输出。至于要分为多少由用户决定,这是一个分片split的过程。而Reduce是一个汇总的过程,Map将一个任务分成多个子任务进行处理后,Reduce将结果进行简单求和。
如:输入<行号,”a,b,c”>则map后输出<”a”,1><”b”,1><”c”,1>。
如:输入<”a”,<1,1,1>>则Reduce后输出<”a”,3>
任务的数量
Map任务的数量
Hadoop为每个split创建一个Map任务,split的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块。
Reduce的数量
- 最优的Reduce任务个数取决于急群中可用的reduce任务槽(Slot)的数目
- 通常设置比reduce任务槽数目小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
拓展:MapReduce的执行过程
从HDFS中读取数据-》加载到InputFormat中-》用户指定Split大小进行逻辑分割-》转换为RR数据集-》进行Map,此时变为
Shuffle过程
Shuffle就是指将Map后的数据进行分区、排序、合并、归并的过程,中文叫做洗牌。
文章图片
从图中可以看出,Shuffle分为Map端的Shuffle和Reduce端的Shuffle。
MapShuffle
MapShuffle的过程是这样的:首先将数据转换为key-value的形式后切分为多个Map任务,一个map任务需要分配一定的缓存,一般默认100MB。一旦缓存过多,则启动溢写功能, 将缓存中的数据通过分区、排序、合并后,需要通过归并形成一个大的文件放在本地磁盘。
注:溢写功能并非缓存达到100MB后才启动,否则后续源源不断的数据无处可放。故一般设置溢写比例为0.8。分区时,一般采用哈希函数,分区的作用是适配多个Reduce任务。排序后可以合并,合并就是如<”a”,1>,<”a”,1>变为<”a”,2>的过程,这样一些重复的键值对可以合并为一个,大大减少溢写到磁盘的数据量。需要注意的是,合并不是必须的,也就是说,要视具体问题来看,合并不能改变最终结果。文件归并时,如果溢写的文件数量大于预定值(默认是3)则可以再次启动Combiner合并,少于3则不需要(因为合并也是一个耗时的过程)。
ReduceShuffle
JobTracker作为作业监视器,一直在监视作业的情况。一旦Map过程处理完成,则Reduce端会被其通知来取走属于自己需要处理的一份,取走后进行合并(combine)和归并(merge)。
注:一个Reduce端可能处理来自多个map端的数据,一个map端可能产生多个Reduce端处理的数据。合并和归并也是不一样的,合并时<”a”,1><”a”,1>-><”a”,2>,归并时<”a”,<1,1>>。
MapReduce阶段
- 只有当Map处理全部结束后,Reduce过程才能开始
- Map需要考虑数据局部性,Reduce无需考虑数据局部性
WordCount简单来说就是词频统计,假设我们现在有三个字符串,那么通过map过程后,字符串就会被分割为多个键值对的形式。
文章图片
这个时候Map输出后要经过Shuffle过程,Shuffle后就执行Reduce过程。
文章图片
类序列化(JavaSE的知识补充)
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Writable是Hadoop的序列化格式,Hadoop定义了这样一个Writable接口。一个类要支持可序列化只需实现这个接口即可。
数据仓库查询分析和Hive Hive简介
- Hive是一个构建在Hadoop顶层的数据仓库工具
- 依赖分布式文件系统HDFS存储数据
- 依赖分布式并行计算模型MapReduce处理数据
- 借鉴SQL语言设计了新的查询语言HiveQL
- 用户可以通过编写的HiveQL语句运行MapReduce任务
- 支持类似SQL的接口,很容易进行移植
Hive特性
采用批处理方式处理海量数据
Hive提供了一系列对数据进行提取、转换、加载ETL的工具
Hive与传统数据库的对比分析
Hive的用户体验在很多方面和传统的关系数据库相似,但是它底层依赖的是HDFS和MapReduce,所以在很多方面又有别于传统数据库。
文章图片
Hive中SQL查询转换为MR作业的过程
输入SQL-》转换为抽象语法树-》转换为查询块-》转换为逻辑查询计划-》重写逻辑查询计划-》转为物理查询计划-》选择最优查询策略
基于内存计算的Spark Spark简介
2013年,Spark加入Apache孵化器项目后发展迅猛,如今已经成为Apache软件基金会最重要的三大分布式计算系统开源项目之一(Hadoop、Spark、Storm)。
注:Hadoop是离线批处理框架,Spark是基于内存计算的实时数据分析框架,Storm是数据流分析框架。
Spark特点
- 运行速度快 :使用DAG执行引擎以支持循环数据流和内存计算。
- 容易使用:支持使用Scala、java、Python和R语言进行编程,可以通过Spark Shell进行交互式编程。
- 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件。
- 运行模式多样:可运行于独立的集群模式中,可运行于Hadoop中,也可运行于Amazon EC2等。
Scala是一门现代的多范式编程语言,运行于Java平台,并兼容现有的Java
程序。
注:多范式指的是可以支持多种编程风格,如函数式编程、面向对象编程。
Scala特性
- Scala具备强大的并发性,支持函数式编程,可以更好地支持分布式系统
- Scala语法简洁,能提供优雅的API
- Scala兼容Java,运行速度快,且能融合到Hadoop生态圈中
- Scala是Spark的主要编程语言,但Spark还支持Java、Python、R作为编程语言
- Scala的优势是提供了REPL即交互式解释器来提高开发效率
Hadoop的缺点
- 表达能力有限;并非所有的应用都可以使用MapReduce编程范式
- 磁盘IO开销大;在Map的Shuffle过程中需要将数据写入磁盘
- 延迟高;Task以进程的方式维护,需要数秒时间才能启动任务
- 在前一个任务执行完成之前,其他任务就无法开始,难以胜任复杂、多阶段的计算任务
- Spark的计算模式借鉴了MR又不同于MR,除了Map和Reduce之外还提供了多种数据集操作类型,编程模型比Hadoop的MR更灵活
- Spark提供了内存计算,可将中间结果放到内存中,对于迭代运行效率更高
- Spark基于DAG的任务调度执行机制,要由于Hadoop的MR迭代执行机制
- Task以线程的方式维护,对于小数据集读取可以达到亚秒级的延迟
RDD(Resillient Distributed Dataset,弹性分布式数据集):一种高度受限的共享内存模型,是一个分布式对象集合,本质上是一个只读的分区记录集合,不同分区可以被保存到集群中不同的结点上,从而可以进行分布式计算
DAG(Directed Acyclic Graph,有向无环图):反映了RDD之间的依赖关系
Stage:是Job的基本调度单位,一个Job会分为多组Task,每组Task被称为Stage,也可以叫TaskSet,代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集
拓展:其他概念以及概念的关系
Executor:是运行在工作结点的一个进程,负责运行Task
Application:用户编写的Spark应用程序
Task:运行在Executor上的工作单元
Job:一个Job包含多个RDD及作用于相应RDD上的各种操作
当执行一个Application时,Driver会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行Task,运行结束后,执行结果会返回给Driver,或者写到HDFS或者其他数据库中
RDD的运行原理
设计背景
许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘工具,共同之处是,不同计算机阶段之间重用中间结果。
目前的MR框架都是把中间结果写入到HDFS中,带来的大量的数据复制、磁盘IO和序列化开销
RDD就是为了满足这种需求而出现的,它提供了一个抽象的数据架构,我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换形成依赖关系,可以实现管道化,避免中间数据存储。
RDD概念
- 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。
- RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作而创建得到新的RDD。
- RDD提供了一组丰富的操作以支持常见的数据运行,分为动作(Action)和转换(Transformation)两种类型
- RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫)。
- 表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(比如MR、SQL、Pregel)。
- Spark用Scala实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作。
- RDD读入外部数据源进行创建
- RDD经过一系列的转换操作,每一次都会产生不同的RDD,并供给下一个转换操作使用
- 最后一个RDD经过动作操作进行转换,并输出到外部数据源
文章图片
以上一系列操作被称为一个Lineage(血缘关系),即DAG拓扑排序的结果。这样做的优点是惰性调用、管道化(流水线化)、避免同步等待、不需要保存中间结果、每次操作变得简单。
拓展:RDD特性
高效的容错性
- 现有容错机制:数据复制或者记录日志
- RDD天生的容错性:可以根据血缘关系重新计算丢失分区、无需回滚
存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化
阶段的划分
- 窄依赖可以实现流水线优化
- 宽依赖无法实现流水线优化
窄依赖就是父RDD分区和子RDD分区表现为一对一或多对一
文章图片
宽依赖就是父RDD分区和子RDD分区表现为一对多
文章图片
Stage的划分方式是:在DAG中进行反向解析,遇到宽依赖就断开,遇见窄依赖就把当前的RDD加入到Stage中,尽量将窄依赖划分在同一个Stage,这样可以实现流水线计算,从而使得数据可以直接在内存中进行交换,避免了磁盘IO开销
文章图片
如A中的分区,由于A到B为宽依赖,故B断开不再Stage1中。而C到D属于窄依赖,故D存在于Stage2中,F亦是,直到F到G为宽依赖,此时G不存在于Stage2中。
Hadoop和Spark的联合部署
由于Hadoop生态系统中的一些组件实现的功能,目前还是无法由Spark取代,比如Storm,现有的Hadoop组件开发的应用,完全转移到Spark上需要一定的成本。
拓展:不同角度的Spark部署方式
角度一
方式一:Standalone
这种方式类似于MR1.0,Slot为资源分配单元。
方式二:Spark on Mesos
Mesos和Spark具有血缘关系,官方推荐部署方式
方式三:Spark on Yarn
角度二
Hadoop+Storm联合部署
部署较繁琐,但是是企业常用的部署方式
文章图片
Spark一站式部署(只用Spark)
- 实现一键式安装和配置、线程级别的任务监控和警告
- 降低硬件集群、软件维护、任务监控和应用开发的难度
- 便于做成统一的硬件、计算平台资源池
- 需要说明的是,Spark Streaming无法实现毫秒级的流计算,因此,对于需要毫秒级实时响应的企业应用而言,仍然需要采用流计算框架(如Storm)
这种部署方式对应角度1的方式3。将多种计算框架统一运行在YARN之上,这种部署方式在新时代下替代了Hadoop+Storm联合部署。这样可以做到如下好处:
- 计算资源按需伸缩
- 不用负载应用混搭,集群利用率高
- 共享底层存储,避免数据跨集群迁移
- 静态数据:一言蔽之就是非实时数据;很多企业为了支持决策分析都会构建数据仓库系统,其中存放的大量历史数据就是静态数据。技术人员可以利用数据挖掘和OLAP分析工具从静态数据中找到对企业有价值的信息
- 流数据:一言蔽之就是实时数据;大数据分析中的重要数据类型,指在时间分布和数量上无限的一系列动态数据集合体,数据的价值随着时间的流逝而降低,因此必须采用实时计算的方式给出秒级响应。
批量计算:充裕时间处理静态数据,如Hadoop
实时计算:即流计算;实时获取来自不同数据源的海量数据经过实时分析处理,获得有价值的信息。
注:流数据不适合采用批量计算,因为流数据不适合用传统关系模型建模。流数据必须采用实时计算,因为实时计算响应时间为毫秒级。数据量少时,不是问题,但是,在大数据时代,数据格式复杂、来源众多、数据量巨大,对实时计算提出了很大的挑战。因此,针对流数据的实时计算——流计算应运而生。
流计算
流计算秉承一个及基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当事件出现时就应该立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的处理引擎。也就是说,一个流计算系统应该达到如下要求:
- 高性能:处理大数据的基本要求,如每秒处理几十万条数据
- 海量式:支持TB级甚至是PB级别的数据规模
- 实时性:保证较低的延迟时间,达到秒级别,甚至是毫秒级别
- 分布式:支持大数据的基本架构,必须能够平滑扩展
- 易用性:能够快速进行开发和部署
- 可靠性:能可靠地处理流数据
- Twitter Storm是一个免费、开源的分布式实时计算系统,Storm对于实时计算的意义类似于Hadoop对于批处理的意义,Storm可以简单、高效、可靠地处理流数据,并支持多种编程语言
- Storm框架可以方便地与数据库系统进行整合,从而开发出强大的实时计算系统
Storm主要术语包括Streams、Spouts、Bolts、Topology和StreamGroupings
Streams
- Storm将流数据Stream描述成一个无限的Tuple序列,这些Tuple序列会以分布式的方式并行地创建和处理
- 每个Tuple是一堆值,每个值有一个名字,并且每个值可以是任何类型。
- Tuple本来应该是一个Key-Value的Map,由于各个组件间传递的tuple的字段名称已经事先定义好了,所以Tuple只需要按序列填入各个Value,所以就是一个Value List(值列表)
- Storm认为每个Stream都有一个源头,并把这个源头抽象为Spout
- 通常Spout会从外部数据源读取数据,然后封装成Tuple形式,发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停地调用该函数
- Storm将Streams的状态转换过程抽象为Bolt。Bolt即可以处理Tuple,也可以处理后的Tuple作为新的Streams发送给其他Bolt
- Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作
- Bolt是一个被动的角色,其接口中有一个execute方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑
- Storm将Spouts和Bolts组成的网络抽象为Topology,它可以被提交到Storm集群执行。Topology可视为流转换图,图中结点是一个Spout或Bolt,边则表示Bolt订阅了哪个Stream。当Spout或者Bolt发送元组时,它会把元组发送到每个订阅了该Stream的Bolt上进行处理。
- Topology里面的每个处理组件都包含处理逻辑,而组件之间的连接则表示数据流动的方向。
- Topology里面的每一个组件都是并行运行的
- 在Topology里面可以指定每个组件的并行度,Storm会在集群里面分配那么多的线程来同时计算
- 在Topology的具体实现上,Storm中的Topology定义仅仅是一些Thrift结构体(二进制高性能的通信中间件),支持各种编程语言进行定义。
一个Streams(流数据)输入进来就像小溪一样,被Storm这个框架所处理。其中小溪的源头我们叫做Spout,当然,Storm可以同时处理多条小溪。
Streams在我们看起来像是小溪,而在Storm看来实际上是一个Tuple传送带,或者说,Streams是一条装有无限个tuple的小溪,所有的小tuple构成大Tuple。
文章图片
Tuple本来可以看做是许多个
文章图片
也就是说,Tuple不是
Spout不仅是源头,你更可以看做是一个主动吸数据的源头(使用nextTuple来主动吸入),它将元数据不断地吸入,然后将它们转换为能够在Stream上流动的Tuple形式,发送到Stream上。
Blot可以看做是小溪的分支处,明显此时小溪分出支流是被动的。被分开的小溪形成了n条新的Stream。
文章图片
在Blot所在的分支处还可以对Tuple做各种操作。
Topology可以理解为是Storm流程的一个思维导图。也就是说它不去考虑细节,其把整个处理的流程抽象为一张图,提交给Storm框架。
文章图片
Spark Streaming设计
Spark Streaming最主要的抽象是DStream,即将连续的数据流按照时间片(如一段一秒)拆分为离散的数据流,每一段数据转换为Spark的RDD,并且对DStream的操作都最终转变为相应的RDD操作。(这实际上借鉴了操作系统中的时间片轮转法)。
Spark Streaming和Storm的对比
两者最大的区别在于,Spark Streaming无法实现毫秒级的流计算,而Storm可以实现。
Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎可以用于实时计算,另一方面,相比于Storm,RDD数据集更容易做高效的容错处理
Spark Streaming采用的小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,因此,方便了一些需要历史数据和实时数据联合分析的特定应用场合。
Spark Streaming和Storm的应用场景
从编程的灵活性来讲,Storm是比较理想的选择,它使用Apache Thrift,可以用任何编程语言来编写拓扑结构。
当需要在一个集群中把流计算和图计算、机器学习、SQL查询分析等进行结合时,可以选择Spark Streaming,因为在Spark上可以统一部署SparkSQL,Spark Streaming、MLlib、GraphX等组件,提供便捷的一体化编程模型
大部分应用场景都不需要毫秒级的响应,因此SparkStreaming在企业还是比较流行,Spark Streaming无法实现毫秒级的流计算,当需要使用毫秒级的流计算时,人们也会选择流行的Flink而非过时的Storm。
Spark流计算组件的演进和Structured Streaming
- Spark2.0之前,使用Spark Streaming,基于RDD的数据抽象
- Spark2.0之后,新增了Structured Streaming,基于DataFrame的数据抽象,采用“微批次模式”
- Structured Streaming在Spark2.0中只是测试版本,2.2版本时才正式发布
- 2018年2月28日,Spark2.3重磅发布,新版本Structured Streaming引入了持续流式处理模式,可以将流处理延迟降低至毫秒级别,与Flink一较高下
- 重新抽象了流式计算
- 易于实现数据的exactly。2.0之前的Spark Streaming只能做到at-least once,框架层次很难帮你做到exactly-once。现在在通过重新设计流式计算框架,使得实现exactly-once变得容易了
- Flink是Apache软件基金会的一个顶级项目,是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,并且可以同时支持实时计算和批量计算。
- Flink具有十分强大的功能,可以支持不同类型的应用程序。Flink的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。
- Flink不仅可以运行包括yarn、Mesos、Kubernetes等在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效的问题。
- 事实证明,Flink已经可以扩展到数千核心,其状态可以达到TB级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在Flink之上。
- 流处理架构需要具备低延迟、高吞吐和高性能的特性,而目前从市场上已有的产品来看,只有Flink可以满足需求。
- Storm虽然可以做到低延迟,但是无法实现高吞吐,也不能在故障发生时准确地处理计算状态
- Spark Streaming通过采用微批处理方法实现了高吞吐和容错性,但是牺牲了低延迟和实时处理能力
- Flink实现了Google DataFlow流计算模型,是一种兼具高吞吐、低延迟和高性能的实时流计算框架,并且同时支持批处理和流处理。此外,Flink支持高度容错的状态管理,防止状态在计算过程中因为系统异常而出现丢失。因此,Flink就成为了能够满足流处理架构要求的理想的流计算框架。
- Kafka(读音:/kfuke/)是一种高吞吐量的分布式发布订阅消息系统,用户通过Kafka系统可以发布大量消息,同时也能实时订阅消费消息。
- Kafka可以同时满足在线实时处理和批量离线处理。
- 在公司的大数据生态系统中,可以把Kafka作为数据交换枢纽,不同类型的分布式系统,可以统一接入到Kafka,实现和Hadoop各个组件之间的不同类型数据的实施高效交换。
- 许多大数据都是以大规模图或网络的形式呈现,如社交网络、传染病传播途径、交通事故对路网的影响。
- 许多非图结构的大数据,也常常会被转换为图模型后进行分析
- 图数据结构很好地表达了数据之间的关联性
- 关联性计算时大数据计算的核心——通过获得数据的关联性,可以从噪声很多的海量数据中抽取有用的信息
一次BSP(Bulk Synchronous Parallel Computing Model,“大同步”模型)计算过程包括一系列全局超步(所谓的超步就是计算中的一次迭代),每个超步主要包括三个组件:
局部计算:每个参与的处理器都有自身的计算任务,它们只读取存储在本地内存中的值,不同处理器的计算任务都是异步并且独立的。
通讯:处理器群相互交换数据,交换的方式是,由一方发起推送(put)和获取(get)操作。
栅栏同步:当一个处理器遇到“路障”,会等到其他所有处理器完成它们的计算步骤;每一次同步也是一个超步的完成和下一个超步的开始。
Pregel简介
- 谷歌公司在2003年到2004年公布的GFS、MR、BigTable,称为后来云计算和Hadoop项目的重要基石。
- 谷歌在后Hadoop时代的新三驾马车——Caffeine(帮助谷歌快速实现大规模网页索引的构建)、Dremel(实时交互分析产品,支持分析PB级别的数据)、Pregel再一次影响着圈子与大数据技术的发展潮流。
- Pregel是一种基于BSP模型实现的并行图处理系统。为了解决大型图的分布式计算问题,Pregel搭建了一套可扩张的、有容错机制的平台,该平台提供了一套非常灵活的API,可以描述各种各样的图计算。Pregel作为分布式图计算的计算框架,主要用于图遍历、最短路径、PageRank计算。
- Pregel计算模型以有向图作为输入
- 有向图的每个顶点都有一个String类型的顶点ID
- 每个顶点都有一个可修改的用户自定义值与之关联
- 每条有向边都和其源顶点关联,并记录了其目标顶点ID
- 边上有一个可修改的用户自定义值与之关联
采用消息传递模型主要基于以下两个原因:
消息传递具有足够的表达能力,没有必要使用远程读取或共享内存的方式
有助于提升系统整体性能。大型图计算通常是由一个集群完成的,集群环境中执行远程数据读取会有较高的延迟;Pregel的消息模式采用异步和批量的方式传递消息,因此可以缓解远程读取的延迟。
Hadoop常用命令总结 启动Hadoop所有进程
start-all.sh #等价于下列两条指令
start-dfs.sh #启动分布式文件系统
start-yarn.sh #启动资源管理系统
注:一般不推荐使用start-all.sh来启动,因为开源框架中内部命令有很多问题。
对HDFS的文件操作
注:由于是对Hdfs操作,故命令基本上前面都带有hdfs dfs,后面跟的基本上都是一些linux指令,大同小异不过多赘述,以下列举一些常用的。
查看指定目录下内容
hdfs dfs -cat [file_path]
将本地文件或文件夹存储到hadoop
hdfs dfs -put [本地地址/目录] [hadoop目录]
将hadoop上某个文件down至本地已有目录下
Hadoop dfs -get [文件目录] [本地目录]
删除hadoop上指定文件或文件夹
hdfs dfs -rm [文件地址/文件夹地址]
在hadoop指定目录内创建新目录
hdfs dfs -mkdir -p /user/[目录名]
在hadoop指定目录下新建一个空文件
hdfs dfs -touchz /user/[文件名]
重命名Hadoop上某个文件
hdfs dfs -mv /user/[文件地址]
杀死hadoop作业
Hadoop job -kill [job-id]
查看帮助
hdfs dfs -help
查看HDFS支持的所有命令
hdfs dfs
Hadoop框架控制
节点添加
添加一个新的DataNode节点,先在新加节点上安装好Hadoop,要和NameNode使用相同的配置(可以直接从NameNode复制),修改HADOOPHOME/conf/master文件,加入NameNode主机名。然后在NameNode节点上修改HADOOP_HOME/conf/slaves文件,加入新节点名,再建立新加节点无密码的SSH连接,运行启动命令为:/usr/local/hadoop$bin/start-all.sh
负载均衡
HDFS的数据在各个DataNode中的分布可能很不均匀,尤其是在DataNode节点出现故障或新增DataNode节点时。新增数据块时NameNode对DataNode节点的选择策略也有可能导致数据块分布不均匀。用户可以使用命令重新平衡DataNode上的数据块的分布:/usr/local/hadoop$bin/start-balancer.sh
退出安全模式
NameNode在启动时会自动进入安全模式。安全模式是NameNode的一种状态,在这个阶段,文件系统不允许有任何修改。
系统显示Name node in safe mode,说明系统正处于安全模式,这时只需要等待几十秒即可,也可通过下面的命令退出安全模式:/usr/local/hadoop$bin/hadoop dfsadmin -safemode leave
进入安全模式
在必要情况下,可以通过以下命令把HDFS置于安全模式:/usr/local/hadoop$bin/hadoop dfsadmin -safemode enter
推荐阅读
- 面试|springboot启动报错(Failed to start bean ‘documentationPluginsBootstrapper‘)
- java|Docker 文件映射
- java|Linux给目录及目录下所有文件赋予最高权限
- java|Linux卸载RabbitMQ
- java|mybatis idea 好用的插件
- java|redis批量删除key值
- vue|Vuex——Mutation传递参数
- 前端|Vuex —— 组件之间数据共享
- nginx|业务前端界面报错504排查思路和解决办法