#|textFile
1.1、textFile, 实际调用hadoopFile
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
// 实际调用hadoopFile
hadoopFile(path,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text],// 此处是hadoop的类型
minPartitions)
.map(pair => pair._2.toString).setName(path)
}
1.1.1、hadoopFile
- 广播hadoop configuration, 通过BroadcastManager
- 定义偏函数
(jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
, 用于以后构件输入路径 - 构件hadoopRDD
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
1.1.2、调用map方法将hadoopRDD映射为MappeedRDD
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
- 调用SparkContext的clean方法, 实际调用的是ClosureCleaner的clean方法, 这里意在清除闭包中不能序列化的变量, 防止RDD在网络传输过程中反序列化失败
- 构建MapPartitionsRDD
- 调用父类RDD的辅助构造器, 辅助构造器首先将oneParent封装为OneToOneDependency, 是NarrowDependency的实现
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
推荐阅读
- 一个人的碎碎念
- 野营记-第五章|野营记-第五章 讨伐梦魇兽
- Shell-Bash变量与运算符
- 清明,是追思、是传承、是感恩。
- 牛人进化+|牛人进化+ 按自己的意愿过一生
- 七老修复好敏感、角质层薄、红血丝
- 华为旁!大社区、地铁新盘,佳兆业城市广场五期!
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 螃蟹和这些食物同吃,轻则腹泻、重则中毒!要小心哦~
- 八、「料理风云」