无监督学习[Unsupervised Learning] 输入数据没有被标记,也没有确定的结果。样本数据类别未知,需要根据样本间的相似性对样本集进行分类(聚类,clustering)试图使类内差距最小化,类间差距最大化。通俗点将就是实际应用中,不少情况下无法预先知道样本的标签,也就是说没有训练样本对应的类别,因而只能从原先没有样本标签的样本集开始学习分类器设计。
非监督学习目标不是告诉计算机怎么做,而是让它(计算机)自己去学习怎样做事情。非监督学习有两种思路。第一种思路是在指导Agent时不为其指定明确分类,而是在成功时,采用某种形式的激励制度。需要注意的是,这类训练通常会置于决策问题的框架里,因为它的目标不是为了产生一个分类系统,而是做出最大回报的决定,这种思路很好的概括了现实世界,agent可以对正确的行为做出激励,而对错误行为做出惩罚。
无监督学习的方法分为两大类:
(1) 一类为基于概率密度函数估计的直接方法:指设法找到各类别在特征空间的分布参数,再进行分类。
(2) 另一类是称为基于样本间相似性度量的简洁聚类方法:其原理是设法定出不同类别的核心或初始内核,然后依据样本与核心之间的相似性度量将样本聚集成不同的类别。
利用聚类结果,可以提取数据集中隐藏信息,对未来数据进行分类和预测。应用于数据挖掘,模式识别,图像处理等。PCA和很多deep learning算法都属于无监督学习。
监督学习与无监督学习的不同点
有监督学习方法必须要有训练集与测试样本。在训练集中找规律,而对测试样本使用这种规律。而非监督学习没有训练集,只有一组数据,在该组数据集内寻找规律。
有监督学习的方法就是识别事物,识别的结果表现在给待识别数据加上了标签。因此训练样本集必须由带标签的样本组成。而非监督学习方法只有要分析的数据集的本身,预先没有什么标签。如果发现数据集呈现某种聚集性,则可按自然的聚集性分类,但不予以某种预先分类标签对上号为目的。
非监督学习方法在寻找数据集中的规律性,这种规律性并不一定要达到划分数据集的目的,也就是说不一定要“分类”。
这一点是比有监督学习方法的用途要广。 譬如分析一堆数据的主分量,或分析数据集有什么特点都可以归于非监督学习方法的范畴。
用非监督学习方法分析数据集的主分量与用K-L变换计算数据集的主分量又有区别。后者从方法上讲不是学习方法。因此用K-L变换找主分量不属于无监督学习方法,即方法上不是。而通过学习逐渐找到规律性这体现了学习方法这一点。在人工神经元网络中寻找主分量的方法属于无监督学习方法。
何时采用哪种方法
简单的方法就是从定义入手,有训练样本则考虑采用监督学习方法;无训练样本,则一定不能用监督学习方法。但是,现实问题中,即使没有训练样本,我们也能够凭借自己的双眼,从待分类的数据中,人工标注一些样本,并把它们作为训练样本,这样的话,可以把条件改善,用监督学习方法来做。对于不同的场景,正负样本的分布如果会存在偏移(可能大的偏移,可能比较小),这样的话,监督学习的效果可能就不如用非监督学习了。
K-means Algorithm
概念描述
K-means算法是将散列的数据尝试聚类为K个聚类类别的算法。常数K是算法开始之前就去人工指定的或是通过算法来推导出的。如例图中我们可以直观看出他就是两个聚类,则我们就可以令K=2;而如果是混乱的数据,则需要通过其他算法先推导出K值。
K的人工指定中也可以根据目的来分类,如假如我们要根据一个数据集来区分衣服的大小,而我们能归类的聚类数是有最大值的,比如分为大小中和特大四种,这是我们实际意义上能够分出的类别数,这一点不受制于数据本身,此时就可以人工指定K的值。
左图一眼就可以看出分三类,而右图一眼看上去却不知道到底该分几类,但是我们根据自己的需求(分为大中小三类),即根据目的或需求来指定K值
然后在数据群落里生成两个
随机的质心 ,计算质心与该数据集中所有点的距离,取某数据点到质心的距离相对于到其他质心的距离中的最小值。
如上图所示,用不同颜色代表不同的聚类,根据质心的相对距离归类。归类之后,再来讨论。由于初始质心的点是随机的,也就意味着准确率无法保证,因此从宏观上而言我们需要重复多次该行动来尝试找到一个合理的聚类。
所以,我们将两个聚类隔离,分别计算其新的质心。质心的计算就是其所有数据点的平均值。得到新的质心后,再根据新的质心来聚类,对比之前的聚类,数据集的聚类状态可能会发生改变。
如图所示,能够直观看出两个聚类的分布变化;接下来只要重复上述步骤即可。
重复该工程直到质心不再改变,则获得最终确定的聚类。只要质心改变则有可能导致聚类发生改变,那么如果质心不再改变,则一定不会有聚类的改变。
Ps)在上述情况下,可能会出现原定K类的聚类中,有某一个聚类没有被分配到任何数据点,也就意味着该聚类内无数据,此时我们只需要将其剔除,将总聚类数变为K-1即可
总结:
该算法的特点可以用上图的公式表达:
n表示该数据集所处的维度;m表示数据集的个数,K表示聚类的数目;μ表示质心。
优点:
1、原理简单(靠近中心点) ,实现容易
2、聚类效果中上(依赖K的选择)
3、空间复杂度o(N)时间复杂度o(IKN)
N为样本点个数,K为中心点个数,I为迭代次数
缺点:
1、对离群点, 噪声敏感 (中心点易偏移)
2、很难发现大小差别很大的簇及进行增量计算
3、结果不一定是全局最优,只能保证局部最优(与K的个数及初值选取有关)
K-means Optimization Objective[优化]
接下来我们将质心称为簇心。是求每一个数据点到每个簇心的最短距离,换言之,想要一步求出对应数据点属于哪个聚类。
同理,再找出新的聚类后,簇心会移动,移动后再以此方式寻找新的聚类与新的簇心移动,直到簇心不再移动。
K-means Random Initialization[随机初始化]
既然随机指定两个簇心,则有可能会出现两个簇心过近导致精度极大幅度下降的问题。随机簇心的不同,最终收敛到的聚集也可能不同。
多次数随机,一次的随机结果可能会产生巨大的偏移,但是次数足够多的随机收敛,可以让结果偏向于正确的解集。当k值不大时,该方法有很好的性能;但是如果K很大,则该方法没有什么实际意义。因为如果K的值足够大,其随机分布的簇心集中出现在某区域的概率相对就会很小,换言之能够最终收敛到不错的聚类的可能性就很大,在这种情况下进行多次随机,徒增开销而收益不大。
Choosing the Number of Clusters[甄选聚类]
如上图中所示,对于一个数据集我们想要人工指定K值时,可以指定的值不唯一。由于不是监督学习,没有“标准答案”,也因此不能断言K到底是什么。
那么此时,我们就需要一种算法来解决这个问题。
Elbow方法:对于n个点的数据集,迭代计算k from 1 to n,每次聚类完成后计算每个点到其所属的簇中心的距离的平方和,可以想象到这个平方和是会逐渐变小的,直到k==n时平方和为0,因为每个点都是它所在的簇中心本身。
也就是说,该方法可以给出一个关于K变化的聚类平均距离的函数,能够给出的的指导意义是:如果分的聚类多(K值大),则数据之间平均距离小,也就说区别更细致,换言之意义不大;如果分的聚类小,则平均距离大,区别比较模糊,换言之可以考虑进一步细分。也就说我们应当考虑找到一个合适的点,能让这些个聚类表现出最高的价值且同时不冗余
这里又会遇到一个问题,就是这个elbow点,大多数情况不是那么明显可以区分出的。左图中,可以找到明显的elbow点,然后我们取此时的K值进行算法细分;而大多数情况下,Elbow并不明显,因此我们也只能大概的选一个值来尝试。
总结:这个方法很有意义,但是不必要。因为使用这个方法或许能够明确的找到一个合适的K值,即便不能也能提供一个不错的选择区间;但是,由于大多数情况下提供的K值仍然是不确定的,可能和人工指定预想的区间差距不大。
这里举个例子,分类这个问题已经是个老生常谈的问题了,类可以说是越分越细,没有极限,由于分类越细致,子类别间区别越小,其实际意义也就越小。根据物种分类,可以分出很多动植物以及各种科属,这些大方面的区分无疑是必要的,因为犬科和猫科差别极大;而猫科中老虎和猫和豹子差别也是非常大;但是再细分一点,橘猫和花猫差别就非常小,再细分一点,你的宠物橘猫和我的宠物橘猫总是有这点那点的一些细微差距,而这些差距的意义就是用于区分你我的猫,其区分的价值意义越来越小,当我们带着彼此的橘猫去看兽医时,兽医对你我的橘猫进行区分进行诊断,因为他们都是橘猫;但是兽医对狗和猫的诊断一定会有很大区别。
所以,回到正题,K值综合考虑应该如何区分呢?我认为还是按照目的区分,按照需求区分是最合适的。
使用算法来区分当然能够得到一个不错的结论,但是数据终究是为我们服务,我们分析一个数据时应该明白这个数据我们要用来做什么,只有当遇到确实需要让系统来做出归类时使用方法的效果才会显著,只从精细程度上而言,对于有明确需求的分类,人工指定反而更加节省成本。
还是那个例题,对于衣服生产商,标识衣服的尺寸时,你可以分大中小,也可以分特大大中小特小;具体分K=3还是5,需要用算法和数据比对吗。答案是有意义但是不必要,因为对于市场的把控,数据虽然更加直观,但是商家完全可以结合自身需求和目标来规划,从而节省成本谋取更大的利益。
越精细的区分,所需要的投入越大,换而言之,售价可以对应提高。(现在大部分衣服厂商可能不同尺寸同一价格),所以我们换个例子。
你是一个饮料制造厂商,大多数饮料基准都是500ml,你是否想要将自己的饮料也出1L甚至2L乃至更多。你会发现,目前市场上,有2L以上规格的饮料大多数是水已经知名度非常高的可乐雪碧,而新兴饮料产品,一般规格在300~600ml。毫无疑问,你的容量越大,售价也会越高,制作成本也会提高,那么你分多类售卖投入的成本能否换取对应的价值就是商家们要考虑的问题。过高的售价,以及饱和的需求,会告诉你不可能出现过多的分类,尤其是对于饮料这种东西,几百毫升的体验是一个阈值,过多的容量一定不受喜欢(喝腻了、撑了)等,反而水这种易存储,需求量大,价格低的商品可以出大容量产品。
说这么多,本意就是强调数据分类时,K值可以根据自身需求和目的来指定,算法的介入并不必要;只有当 探求新兴领域时,数据的模拟才显得更加珍贵,越是成熟的体系,不必要的数据分析只会徒增成本。
Code
val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._
val data = spark.read.
option( "inferSchema" , true) .
option( "header" , false).
csv( "Data/kdd/kddcup.data_10_percent_corrected" ).
toDF(
"duration","protocol_type" , "service", "flag" ,
"src_bytes" , "dst_bytes" , "land" , "wrong_fragment" , "urgent" ,
"hot" , "num_failed_logins" , "logged_in", "num_compromised" ,
"root_shell", "su_attempted" , "num_root" , "num_file_creations" ,
"num_shells","num_access_files" , "num_out bound_cmds" ,
"is_host_login", "is_guest_login" , "count" , "srv_count",
"serror_rate" , "srv_serror_rate" , "rerror_rate" , "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff_host_rate" ,
" dst_host_count", "dst_host_srv_count " ,
"dst_host_same_srv_rate" , "dst_host_di ff_srv_rate" ,
"dst_host_same_src_port_rate" , "dst_host_srv_diff_host_rate" ,
"dst_host_serror_rate" , "dst_host_srv_serror_rate" ,
"dst_host_rerror_rate" , "dst_host_srv_rerror_rate",
"label")
data.cache()
data.select("label").groupBy("label").count().orderBy($"count".desc).show(25)
val numericOnly = data.drop("protocol_type" , "service", "flag" ).cache()
import org.apache.spark.ml.{PipelineModel,Pipeline}
import org.apache.spark.ml.clustering.{KMeans,KMeansModel}
import org.apache.spark.ml.feature.{OneHotEncoder,VectorAssembler,StringIndexer,StandardScaler}
import org.apache.spark.ml.linalg.{Vector,Vectors}
import org.apache.spark.sql.{DataFrame,SparkSession}
import scala.util.Random
val assembler = new VectorAssembler().
setInputCols(numericOnly.columns.filter(_!="label")).
setOutputCol( "featureVector" )
val kmeans = new KMeans().
setSeed(Random.nextLong( )).
setPredictionCol( "cluster" ).
setFeaturesCol( "featureVector" )
val pipeline = new Pipeline().setStages(Array(assembler,kmeans))
val pipelineModel = pipeline.fit(numericOnly)
val kmeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
kmeansModel.clusterCenters.foreach(println)
val withCluster = pipelineModel.transform(numericOnly)
withCluster.select( "cluster", "label" ).
groupBy( "cluster", "label" ).count().
orderBy($"cluster", $"count".desc).
show(25)
def clusteringScore0(data:DataFrame,k:Int):Double={
val assembler = new VectorAssembler().
setInputCols(data.columns.filter(_!="label")).
setOutputCol("featureVector")
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("featureVector")
val pipeline = new Pipeline().setStages(Array(assembler,kmeans))
val kmeansModel =pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(assembler.transform(data)) / data.count()
}
(20 to 100 by 20).map( k=>(k,clusteringScore0(numericOnly,k))).foreach(println)
def clusteringScore1(data:DataFrame,k:Int):Double={
val assembler = new VectorAssembler().
setInputCols(data.columns.filter(_!="label")).
setOutputCol("featureVector")
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("featureVector").
setMaxIter(40).
setTol(1.0e-5)
val pipeline = new Pipeline().setStages(Array(assembler,kmeans))
val kmeansModel =pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(assembler.transform(data)) / data.count()
}
(20 to 100 by 20).map( k=>(k,clusteringScore1(numericOnly,k))).foreach(println)
def clusteringScore2(data:DataFrame,k:Int):Double={
val assembler = new VectorAssembler().
setInputCols(data.columns.filter(_!="label")).
setOutputCol("featureVector")
val scaler = new StandardScaler().
setInputCol("featureVector").
setOutputCol("scaledFeatureVector").
setWithStd(true).
setWithMean(false)
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("featureVector").
setMaxIter(40).
setTol(1.0e-5)
val pipeline = new Pipeline().setStages(Array(assembler,scaler,kmeans))
val pepelineModel=pipeline.fit(data)
val kmeansModel =pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(pepelineModel.transform(data)) / data.count()
}
(60 to 270 by 30).map( k=>(k,clusteringScore2(numericOnly,k))).foreach(println)
def oneHotPipeline( inputCol: String): (Pipeline,String)= {
val indexer = new StringIndexer().
setInputCol(inputCol).
setOutputCol(inputCol + "_indexed" )
val encoder = new OneHotEncoder().
setInputCol(inputCol + "_indexed" ).
setOutputCol( inputCol + "_vec" )
val pipeline = new Pipeline( ).setStages(Array(indexer,encoder))
(pipeline,inputCol + "_vec" )
}
def clusteringScore3(data:DataFrame,k:Int):Double={
val (protoTypeEncoder ,protoTypeVecCol) = oneHotPipeline("protocol_type")
val (serviceEncoder,serviceVecCol) = oneHotPipeline("service" )
val (flagEncoder,flagVecCol) = oneHotPipeline( "flag")
val assembleCols = Set(data.columns:_*)--
Seq( "label","protocol_type","service","flag" ) ++
Seq(protoTypeVecCol , serviceVecCol , flagVecCol )
val assembler = new VectorAssembler().
setInputCols(assembleCols.toArray).
setOutputCol("featureVector")
val scaler = new StandardScaler().
setInputCol("featureVector").
setOutputCol("scaledFeatureVector").
setWithStd(true).
setWithMean(false)
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("featureVector").
setMaxIter(40).
setTol(1.0e-5)
val pipeline = new Pipeline().setStages(Array(protoTypeEncoder,serviceEncoder,flagEncoder,assembler,scaler,kmeans))
val pepelineModel=pipeline.fit(data)
val kmeansModel =pipeline.fit(data).stages.last.asInstanceOf[KMeansModel]
kmeansModel.computeCost(pepelineModel.transform(data)) / data.count()
}
(60 to 270 by 30).map( k=>(k,clusteringScore3(data,k))).foreach(println)
def fitPipeline4(data:DataFrame,k:Int):PipelineModel={
val (protoTypeEncoder ,protoTypeVecCol) = oneHotPipeline("protocol_type")
val (serviceEncoder,serviceVecCol) = oneHotPipeline("service" )
val (flagEncoder,flagVecCol) = oneHotPipeline( "flag")
val assembleCols = Set(data.columns:_*)--
Seq( "label","protocol_type","service","flag" ) ++
Seq(protoTypeVecCol , serviceVecCol , flagVecCol )
val assembler = new VectorAssembler().
setInputCols(assembleCols.toArray).
setOutputCol("featureVector")
val scaler = new StandardScaler().
setInputCol("featureVector").
setOutputCol("scaledFeatureVector").
setWithStd(true).
setWithMean(false)
val kmeans = new KMeans().
setSeed(Random.nextLong()).
setK(k).
setPredictionCol("cluster").
setFeaturesCol("featureVector").
setMaxIter(40).
setTol(1.0e-5)
val pipeline = new Pipeline().setStages(Array(protoTypeEncoder,serviceEncoder,flagEncoder,assembler,scaler,kmeans))
pipeline.fit(data)
}
val pipelineModel = fitPipeline4(data,120)
val countByClusterLabel = pipelineModel.transform(data).
select ( "cluster" ,"label" ).
groupBy( "cluster", "label" ).count( ).
orderBy( "cluster" ,"label" )
countByClusterLabel.show ( )
val kMeansModel = pipelineModel.stages.last.asInstanceOf[KMeansModel]
val centroids = kMeansModel.clusterCenters
val clustered = pipelineModel.transform(data)
val threshold = clustered.
select( "cluster", "scaledFeaturevector" ).as[(Int,Vector)].
map { case (cluster, vec) => Vectors.sqdist(centroids(cluster),vec)}.
orderBy($"value".desc).take(100).last
val originalCols = data.columns
val anomalies = clustered.filter{ row =>
val cluster = row.getAs[Int]( "cluster" )
val vec = row.getAs[Vector]( "scaledFeatureVector")
Vectors.sqdist(centroids(cluster), vec) >= threshold
}.select(originalCols.head,originalCols.tail:_*)
println(anomalies.first())