##|## Spark实现Canopy聚类算法
Spark实现Canopy聚类算法
为什么需要Canopy算法
Canopy算法一般是为其他聚类算法的一种聚类方法,常用的聚类如 K-means 等聚类算法都需要事先k值(即聚类个数),并且会随机选择k个初始聚类中心。这种实现指定的k和随机初始化的聚类中心不仅会降低聚类算法的效率,而且得到的结果也可能是局部最优的。Canopy算法就可以解决以上问题。
Canopy算法的步骤
Canopy算法的核心步骤如下:
- 给定一组待聚类的数据集D;
- 给定两个阈值 T1,T2 且T1 > T2;
- 随机选择D中的一个数据d作为中心,并将d从D中移除;
- 计算D中所有点到d的距离distance;
- 将所有distance < T1的点归到以d为中心的canopy1类中;
- 将所有distance < T2的点从D中移除;
- 重复步骤4-6,直到D为空,形成多个Canopy类。
Canopy聚类过程如图所示:
文章图片
Canopy聚类过程 代码实现 本案例以鸢尾花数据集为例
// 数据加载
val irisData = https://www.it610.com/article/spark.read
.option("header", true)
.option("inferSchema", true)
.csv(inputFile) ////////////////////////数据预处理//////////////////////////////
val schema = irisData.schema
val fts = schema.filterNot(_.name == """class""").map(_.name).toArray// 特征处理
val amountVectorAssembler: VectorAssembler = new VectorAssembler()
.setInputCols(fts)
.setOutputCol("features")val ftsDF = amountVectorAssembler
.transform(irisData)
.select("features")// 计算欧式距离
def Euclidean(p1: Seq[Double], p2: Seq[Double]) = {
var distant = 0d
for (i <- 0 until p1.length) {
distant += pow(p1(i) - p2(i), 2)
}
sqrt(distant)
}// Canopy聚类算法过程
def ConapyRunner(df: DataFrame, t1: Double, t2: Double) = {var flag = true
var cluster = 0 // ClusterId Canopy类的Id// 结果表的Schema信息
val schemaOfResult: StructType = df.schema
.add("conapyId", IntegerType) //增加一列表示类id的字段// 创建一个空DF 用于接收结果
var resultDF =
spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schemaOfResult)var dfcopy = dfwhile (flag) {cluster += 1// 随机选择一个初始点p 为了方便计算 在这里选择第一个点作为初始点
val d = dfcopy.head.getAs[DenseVector]("features")//定义距离计算的自定义函数(默认为欧式距离)
val distanceUDF = udf((vec: DenseVector) => {
val distance = Euclidean(vec.toArray, d.toArray)
//输出结果模式匹配
(distance < t1, distance < t2)
})val distanceDF = dfcopy
.withColumn("distance", distanceUDF($"features"))
.withColumn("t2threshold", $"distance".getField("_2"))
.withColumn("t1threshold", $"distance".getField("_1"))// 将距离小于t1的输出 同时剔除的还有初始点d
resultDF = resultDF.union(
distanceDF
.where($"t1threshold")
.select($"features", lit(cluster) as "cluster")
)// 将距离小于T2的点从数据集移除
dfcopy = distanceDF
.where(!$"t2threshold")
.select($"features")// 判断df是否已经为空 df为空时 结束循环
flag = !dfcopy.isEmpty}// 返回聚类结果集
resultDF}// 聚类方法调用
val dd = ConapyRunner(ftsDF, 4, 1.5)dd.show()
结果查看(已省略部分展示结果)
+-----------------+--------+
|features|conapyId|
+-----------------+--------+
|[5.1,3.5,1.4,0.2]|1|
|[4.9,3.0,1.4,0.2]|1|
|[5.6,2.5,3.9,1.1]|2|
|[5.9,3.2,4.8,1.8]|2|
+-----------------+--------+
从算法可以得知,使用Canopy聚类时,产生的各类之间可能是有交集的,数据集的Canopy划分完成后,类似于下图:
文章图片
Canopy聚类结果 参考资料:
https://www.deeplearn.me/1765.html
https://www.cnblogs.com/jamesf/p/4751565.html
推荐阅读
- 关于QueryWrapper|关于QueryWrapper,实现MybatisPlus多表关联查询方式
- MybatisPlus使用queryWrapper如何实现复杂查询
- python学习之|python学习之 实现QQ自动发送消息
- 孩子不是实现父母欲望的工具——林哈夫
- opencv|opencv C++模板匹配的简单实现
- Node.js中readline模块实现终端输入
- java中如何实现重建二叉树
- 人脸识别|【人脸识别系列】| 实现自动化妆
- paddle|动手从头实现LSTM
- pytorch|使用pytorch从头实现多层LSTM