package com.kingbase.kmeansDemo
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.clustering.KMeansModel
object Kmeans_result {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
//val resPath = "hdfs://node42:8020 /user/data/vsm/output/result"
//val data = sc.textFile(resPath)
val data = sc.textFile(args(0) + args(1))
val parsedData1 = data.map(x => (x.split("\t")(0),x.split("\t")(1)))
//内容
//val parsedData2 = parsedData1.map( s =>(s._2.split(" "))).map(x=>x.mkString(" "))
val parsedData2 = parsedData1.map{ s =>
var allLine = ""
val line = s._2.split(" ").foreach{ x =>
allLine += x.split(",")(1)+" "//
}
allLine
}
//文件名
//val parsedData3 = parsedData1.map( s =>(s._1))
//kmeans 需要的矩阵
val parsedData4 = parsedData2.map(s => Vectors.dense(s.split(' ').map(_.trim.toDouble))).cache()
//设置簇的个数为3
val numClusters = args(2).toInt
//迭代20次
val numIterations = 20
//运行10次,选出最优解
val runs = 10
val clusters = KMeans.train(parsedData4,numClusters,numIterations,runs)
val WSSSE = clusters.computeCost(parsedData4)
//val pathWC = "hdfs://node42:8020/user/data/kmeans/output/pathWC"
val pathWC = args(0) + "/user/data/kmeans/output/pathWC"
val rdd = sc.makeRDD(List(numClusters.toString + " --> " +WSSSE.toString))
rdd.saveAsTextFile(pathWC)
val pathRes = args(0) + "/user/data/kmeans/output/test"
val pathCount = args(0) + "/user/data/kmeans/output/count"
val pathKey = args(0) + "/user/data/kmeans/output/key"
val result = parsedData1.map { lines =>
val filename = lines._1
var allLine = ""
val buffer = lines._2.split(" ").foreach{ x =>
allLine += x.split(",")(1)+" "
}
val line = Vectors.dense(allLine.split(" ").map(_.trim.toDouble))
val res = clusters.predict(line)
filename + " "+lines._2+" "+ res
}.saveAsTextFile(pathRes)
//filename + \t lines._2 \t + res =====> 统计每个分类的数量
val data2 = sc.textFile(pathRes)
val count1 = data2.map(x => (x.split("\t")(0),x.split("\t")(1),x.split("\t")(2)))
val count2 = count1.map(x => (x._3,1)).reduceByKey(_+_).saveAsTextFile(pathCount)
//==============================================
val data3 = count1.map(x => (x._3,x._2)).reduceByKey{(x,y) =>
x + " " + y
}.map{ x =>
var map: Map[String,Double] = Map()
x._2.split(" ").foreach{ allKV =>
val kv = allKV.split(",")
if(kv.size == 2){ ////***
val k = kv(0)
val v = kv(1).trim.toDouble
if(map.contains(k)){
map += ( k -> (map(k) + v) )
}
else{
map += (k -> v)
}
}
}
val key = map.toSeq.sortWith(_._2>_._2) //降序排序 value
(x._1,key)
}.map{ x =>
var map: Map[String,Double] = Map()
var newString = ""
x._2.foreach{ size =>
map += (size._1 -> size._2)
if(map.size == 10){
newString += (size._1 + "," + size._2 + "@")
return
}else{
newString += (size._1 + "," + size._2 + " ")
}
}
(x._1,newString)
}.map{ x =>
(x._1,x._2.split("@")(0))
}.saveAsTextFile(pathKey)
}
}
联系客服