##|## Spark实现Canopy聚类算法

Spark实现Canopy聚类算法 为什么需要Canopy算法 Canopy算法一般是为其他聚类算法的一种聚类方法,常用的聚类如 K-means 等聚类算法都需要事先k值(即聚类个数),并且会随机选择k个初始聚类中心。这种实现指定的k和随机初始化的聚类中心不仅会降低聚类算法的效率,而且得到的结果也可能是局部最优的。Canopy算法就可以解决以上问题。
Canopy算法的步骤 Canopy算法的核心步骤如下:

  1. 给定一组待聚类的数据集D;
  2. 给定两个阈值 T1,T2 且T1 > T2;
  3. 随机选择D中的一个数据d作为中心,并将d从D中移除;
  4. 计算D中所有点到d的距离distance;
  5. 将所有distance < T1的点归到以d为中心的canopy1类中;
  6. 将所有distance < T2的点从D中移除;
  7. 重复步骤4-6,直到D为空,形成多个Canopy类。
【##|## Spark实现Canopy聚类算法】从Canopy算法实现的步骤来看,Canopy算法的优点明显,同时也存在一定的缺点:就是要事先指定合适的T1 和 T2
Canopy聚类过程如图所示:

##|## Spark实现Canopy聚类算法
文章图片
Canopy聚类过程 代码实现 本案例以鸢尾花数据集为例
// 数据加载 val irisData = https://www.it610.com/article/spark.read .option("header", true) .option("inferSchema", true) .csv(inputFile) ////////////////////////数据预处理////////////////////////////// val schema = irisData.schema val fts = schema.filterNot(_.name == """class""").map(_.name).toArray// 特征处理 val amountVectorAssembler: VectorAssembler = new VectorAssembler() .setInputCols(fts) .setOutputCol("features")val ftsDF = amountVectorAssembler .transform(irisData) .select("features")// 计算欧式距离 def Euclidean(p1: Seq[Double], p2: Seq[Double]) = { var distant = 0d for (i <- 0 until p1.length) { distant += pow(p1(i) - p2(i), 2) } sqrt(distant) }// Canopy聚类算法过程 def ConapyRunner(df: DataFrame, t1: Double, t2: Double) = {var flag = true var cluster = 0 // ClusterId Canopy类的Id// 结果表的Schema信息 val schemaOfResult: StructType = df.schema .add("conapyId", IntegerType) //增加一列表示类id的字段// 创建一个空DF 用于接收结果 var resultDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaOfResult)var dfcopy = dfwhile (flag) {cluster += 1// 随机选择一个初始点p 为了方便计算 在这里选择第一个点作为初始点 val d = dfcopy.head.getAs[DenseVector]("features")//定义距离计算的自定义函数(默认为欧式距离) val distanceUDF = udf((vec: DenseVector) => { val distance = Euclidean(vec.toArray, d.toArray) //输出结果模式匹配 (distance < t1, distance < t2) })val distanceDF = dfcopy .withColumn("distance", distanceUDF($"features")) .withColumn("t2threshold", $"distance".getField("_2")) .withColumn("t1threshold", $"distance".getField("_1"))// 将距离小于t1的输出 同时剔除的还有初始点d resultDF = resultDF.union( distanceDF .where($"t1threshold") .select($"features", lit(cluster) as "cluster") )// 将距离小于T2的点从数据集移除 dfcopy = distanceDF .where(!$"t2threshold") .select($"features")// 判断df是否已经为空 df为空时 结束循环 flag = !dfcopy.isEmpty}// 返回聚类结果集 resultDF}// 聚类方法调用 val dd = ConapyRunner(ftsDF, 4, 1.5)dd.show()

结果查看(已省略部分展示结果)
+-----------------+--------+ |features|conapyId| +-----------------+--------+ |[5.1,3.5,1.4,0.2]|1| |[4.9,3.0,1.4,0.2]|1| |[5.6,2.5,3.9,1.1]|2| |[5.9,3.2,4.8,1.8]|2| +-----------------+--------+

从算法可以得知,使用Canopy聚类时,产生的各类之间可能是有交集的,数据集的Canopy划分完成后,类似于下图:

##|## Spark实现Canopy聚类算法
文章图片
Canopy聚类结果 参考资料:
https://www.deeplearn.me/1765.html
https://www.cnblogs.com/jamesf/p/4751565.html

    推荐阅读