#|textFile

1.1、textFile, 实际调用hadoopFile

/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() // 实际调用hadoopFile hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],// 此处是hadoop的类型 minPartitions) .map(pair => pair._2.toString).setName(path) }

1.1.1、hadoopFile
  • 广播hadoop configuration, 通过BroadcastManager
  • 定义偏函数(jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path), 用于以后构件输入路径
  • 构件hadoopRDD
/** Get an RDD for a Hadoop file with an arbitrary InputFormat * * @note Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped()// This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration)// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }

1.1.2、调用map方法将hadoopRDD映射为MappeedRDD
/** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }

  • 调用SparkContext的clean方法, 实际调用的是ClosureCleaner的clean方法, 这里意在清除闭包中不能序列化的变量, 防止RDD在网络传输过程中反序列化失败
  • 构建MapPartitionsRDD
    • 调用父类RDD的辅助构造器, 辅助构造器首先将oneParent封装为OneToOneDependency, 是NarrowDependency的实现
/** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) = this(oneParent.context, List(new OneToOneDependency(oneParent)))

    推荐阅读