本章的内容是对自然语言的分析,并非单纯数与字符的统计,而是尝试去理解人类的语言描述概念。自然语言通常是指一种自然地随文化演化的语言。例如,汉语、英语、日语为自然语言的例子,这一种用法可见于自然语言处理一词中。
LSA(Latent Semantic Analysis) LSA:一种分析自然语言并根据潜在本意进行统计性处理的算法。其核心思想如下:
如果我们能制作一个表格,这个表格统计着每个单词对于每个文档的重要程度,当我们试图去分析某一篇具体的文章我们就可以通过表格来直观看到关于这篇文章的核心论点,我们就能挑选出想要阅读的文章。
举例:当我们从搜索引擎上键入某些关键词,搜索引擎根据这些关键词返回一个链接页,然后用户从链接页进一步选择自己想要的。这个搜索引擎返回的过程,就需要使用潜在语义分析。
这个算法最初诞生的目的,并不仅仅是为了高效检索出对应核心关键词的内容,还会根据词库中的 相似度来返回虽然不是一个词但是却具有相同意义的词组来,尽量不漏掉关键信息。
LSA统计了词库中单词对于每个文档的关联级别,词库中不同术语之间的关联程度,反映有用程度的重要等级等。通过只选择最重要的概念,LSA可以丢弃一些不相关的噪声,并合并共同发生的线,从而得到更简单的数据表示;LSA可以提供术语与其他术语之间、文档与其他文档之间以及术语与文档之间的相似性得分。这些相似性度量对于查找与查询术语相关的文档集、将文档分组为主题以及查找相关单词等任务非常理想。
那么问题来了,这个表格中的这个值,应该用什么来表示呢?最直观的想法可能是应用统计次数来反馈这个信息,但是真的出现次数最多频率最高的词就是最重要的词吗?
现在很多学术性文献大多是英语制成,如果你试着去统计一篇英语文献中的单词出现频率;并比对其他文献,我相信统计所有文献整体出现频率最高的是冠词介词the/a/of这一类的,但是很明显,这一类词绝对不可能是一篇文献的核心内容。再加上自然语言中有一些修辞手法,【你的内心像花朵一样,花朵是这世界上最单纯、最美丽又最动人的事物】在这句话中,出现频率最高的是最,而这个词只是个程度词,并不能表达这个句子的核心意义。
因此很明显,单单使用出现频率这一变量不能表达这个词就是关于这篇文章最重要的词。
那么如何表达重要程度呢?
LSA中的表达方式是:如果一个词在谋篇文章中大量出现,且在其他文档中也大量出现,则说明这个词一定是不重要的词。就像我们刚刚说到英语文献中的of/the/a之类的冠词介词。而如果一个词,在本篇文档中大量出现而在其他文献中出现频率并没有这么高,说明这个词确实是用来形容当前文章的最简单的表达词。
TF-IDF (Term Frequency * Inverse Document Frequency)
LSA算法将表格变化成矩阵,其行为文档n,列为词库中的词。通过上述式子来计算他们各自的值,我们来一一讨论一下这个公式。
术语出现频率:这篇文章中出现的频率与所有待分析文档中该词出现的总量的比值。这个值很好理解,分母是一个定值不会改变,该式子只会随着分子变化而变化 。这个式子值越大,说明这个术语在这篇文档中出现的次数越多。。也就是说如果这个数值大,这说明该术语在本篇文档中出现频率高
转置·文档频率。分子是所有待分析文档的总数,分母是出现了当前关键词的文档数量,也就说这个式子的分子是定值,而分母是可变的。分母越小,说明这个术语在整体文档中出现的频率越低,而这个式子的值越大。
OK,然后我们来整理一下,两项相乘,两个大的值相乘值一定也相对更大,两个值较小的相乘结果也相对较小。一个大值乘以一个小值,其结果未必是大值。这就反映了LSA的特点。
两个大的值相乘值一定也相对更大:这两个因式分别代表了某术语在当前文档中出现频率高和这个术语在所有文档中并不是高频词。换言之,这个术语对这篇文档有着代表意义但是对其他文档没有关联性。
两个值较小的相乘结果也相对较小:这个术语在当前文档出现频率不算高且在整体所有文档中有大部分文章它出现了,换言之这个词本身是一个高频词,且在当前文档也没有代表性,是一个不重要的词。
一个大值乘以一个小值,其结果未必是大值:这个属于在当前文档出现频率很高但是在整体文档出现频率也很高,换言之很有可能是副词介词冠词等一些偏修饰和结构性的词语,被认定为不重要;这个词在当前文档出现频率比较低,在整体中出现的频率也很低,换言之这个词本身出现就很少,可能是非常少见的表达,自然不会对当前文档有代表意义。当然因为是未必,所以也存在恰巧的情况。
那么,理解了上面两个因式以后,为什么要用log函数呢?
语料库中词语的频率呈指数分布
一个普通单词的出现频率是普通单词的十倍,而普通单词的出现频率可能是罕见单词的十倍或一百倍基于原始IDF的度量标准会给稀有词太大的权重,实际上忽略了所有其他词的影响。为了捕捉这个分布,该方案使用IDF的日志。通过将文档之间的乘法间隙转换为加法间隙,来弱化文档频率的差异
那是因为,高频词与低频词相比,会出现比较极端的情况。高频的频率太高,低频的频率太低,甚至高频词与中频词的差距都非常大。假设低频词出现了1次,中频词出现了100次,而高频词出现了一百万次,你就会发现相对于高频词而言,中频低频其实都是小到可以忽视的部分,不需要把他们两类细分,这就会导致数据不够精确。而追加log函数可以让原本的近指数函数更接近线性函数,更方便对中低频数据与高频数据的对比分析。
TF-IDF 的弊端:
他将文档中的单词作为词,以词为单位分离出来并进行统计,这就会忽略原本的句意以及出现一词多义的情况。
否定是一个很重要的语义成分,但是如果把所有的否定词单独拿出,并进行统计,那么否定的低频率注定被忽视,此后得出的分析结果往往不尽人意
忽略了句子结构和顺序。在某些倒装句,修辞语法中表达的潜在含义会随着词语拆分和词语统计变成单纯的线性单词统计。也会对分析的结果造成影响
一词多义。可能会有一个词具有多种意思的情况。比如band有乐队和波段和带状物的意思,“Radiohead is the best band ever” and “I broke a rubber band”中两个band不是同一个意思,但是他会被统计为同一个词语。
这是TF-IDF的弊端。
SVD分解
SVD分解:
首先我们要明确的事,可能出现的能够代表一篇文档的高频词汇的判定方法有一条是在其他文档出现频率不高。换言之,可能不在其他文档中出现,假如当前文档是LSA的论文,那么其他没有出现LSA相关知识的文章,自然就以0表示了。也就说,一个偌大的矩阵中有大量的0,是一个稀疏矩阵。一个稀疏矩阵的利用效率低开销大,不是我们想要的,所以我们想要通过SVD分解得到一个浓密矩阵。
这里,我们将m设为文档数量,n设为词库总量。并自定义一个k,令k作为浓密矩阵的参数,且必然k≤n,因为k是过渡矩阵且最大值=n
当k与n相等,因子矩阵的乘积完全重构了原始矩阵;否则乘法结果是原始矩阵的低秩近似。k的选择一般比n小得多。SVD分解的近似往往也会过滤很多噪声,分解过程中失真的部分很大可能是噪音部分,因此不用担心近似值对结果造成很大的失真影响,失真的部分很大概率是噪音;
术语、文档或概念的每个向量在其各自的空间中定义了一个轴(因为它是标准正交基),赋予术语、文档或概念的权重意味着沿着该轴的长度。在对应的向量空间内,每个向量都有对应的权重。
我们来看每一个子矩阵代表的意义:
U : m ∗ k U:m*k U:m∗k
每行对应一个文档,每列对应一个概念
定义了文档空间和概念空间之间的映射
S : k ∗ k S:k*k S:k∗k
具有奇异值
每个????中的对角元素对应于一个概念
这些奇异值的大小对应于该概念的重要性
V : n ∗ k V:n*k V:n∗k
每行对应一个术语,每列对应一个概念
它定义了term空间(每个点是一个n维向量,对每个项都有权重)和概念空间(每个点是一个k维向量,对每个概念都有权重)之间的映射。每个位置上的值可以解释为该术语与该概念的相关性
简单提一下计算方法:中间的被称为奇异值,是将某一个矩阵与其转职相乘后得到的对称矩阵进行对角化后,把对角化的值开根号得到对应的奇异值。而左右两边的向量分别是原矩阵对角化(正反)两种形式的对应矩阵。
Practice
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable,Text}
import org.apache.spark.ml.feature.{CountVectorizer,IDF}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame,Dataset,SparkSession}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
//import java.io.ByteArrayInputStream;
import edu.umd.cloud9.collection.XMLInputFormat
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation,SentencesAnnotation,TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation,StanfordCoreNLP}
//import info.bliki.wiki.dump ._
//import org.xml.sax.SAXException
//import spark.implicits._
val path = "Data/Wiki/Wikipedia-Geometry.xml"
@transient val conf = new Configuration()
conf.set(XMLInputFormat.START_TAG_KEY,"<page>")
conf.set(XMLInputFormat.END_TAG_KEY,"</page>");
val kvs = spark.sparkContext.newAPIHadoopFile(path,classOf[XMLInputFormat],classOf[LongWritable],classOf[Text],conf)
val rawXmls = kvs.map(_._2.toString).toDS()
import info.bliki.wiki.dump._
import org.xml.sax.SAXException
import spark.implicits._
import java.io.ByteArrayInputStream;
case class Page(var page: WikiArticle = new WikiArticle) {}
class ArticleFilter(val Page: Page) extends IArticleFilter {
@throws(classOf[SAXException])
def process(page: WikiArticle,siteinfo: Siteinfo){
Page.page = page
}
}
def WikiXmlToPlainText (pageXml: String): Option[(String,String)]= {
val Page = new Page
try {
val parser = new WikiXMLParser(new ByteArrayInputStream(pageXml.getBytes),new ArticleFilter(Page))
parser.parse()
} catch {
case e: Exception =>
}
val page = Page.page
if (page.getText != null && page.getTitle != null
&& page.getId != null && page.getRevisionId != null
&& page.getTimeStamp != null && !page.isTemplate) {
Some((page.getTitle,page.getText ))
}else {
None
}
}
val docTexts = rawXmls.filter(_ != null ).flatMap(WikiXmlToPlainText)
def createNLPPipeline():StanfordCoreNLP = {
val props = new Properties()
props.put("annotators","tokenize,ssplit,pos,lemma")
new StanfordCoreNLP(props)
}
def isOnlyLetters(str: String): Boolean = {
str.forall(c => Character.isLetter(c))
}
def plainTextToLemmas(text:String,stopWords:Set[String],pipeline:StanfordCoreNLP): Seq[String] = {
val doc = new Annotation(text)
pipeline.annotate(doc)
val lemmas = new ArrayBuffer[String]()
val sentences = doc.get(classOf[SentencesAnnotation])
for (sentence <- sentences.asScala;
token <- sentence.get(classOf[TokensAnnotation]).asScala){
val lemma = token.get(classOf[LemmaAnnotation])
if ( lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)){
lemmas += lemma.toLowerCase
}
}
lemmas
}
val stopWords = scala.io.Source.fromFile("stopwords.txt" ).getLines().toSet
val bStopWords = spark.sparkContext.broadcast(stopWords)
val terms: Dataset[(String, Seq[String])]=
docTexts.mapPartitions { iter =>
val pipeline = createNLPPipeline()
iter.map { case(title, contents) =>
(title,plainTextToLemmas(contents,bStopWords.value,pipeline))
}
}
val termsDF=terms.toDF("title","terms")
val filtered = termsDF.where(size($"terms")>1)
filtered.show
val numTerms=20000
val countVectorizer=new CountVectorizer().
setInputCol("terms").
setOutputCol("termFreqs").
setVocabSize(numTerms)
val vocabModel=countVectorizer.fit(filtered)
val docTermFreqs=vocabModel.transform(filtered)
docTermFreqs.cache()
val idf=new IDF().
setInputCol("termFreqs").
setOutputCol("tfidfVec")
val idfModel = idf.fit(docTermFreqs)
val docTermMatrix=idfModel.transform(docTermFreqs).select("title","tfidfVec")
val termIds:Array[String]=vocabModel.vocabulary
val docIds=docTermFreqs.rdd.map(_.getString(0)).zipWithUniqueId().map(_.swap).collect().toMap
import org.apache.spark.mllib.linalg.{Vectors,Vector => MLLibVector}
import org.apache.spark.ml.linalg.{Vector =>MLVector}
val vecRdd = docTermMatrix.select("tfidfVec").rdd.map { row =>
Vectors.fromML( row .getAs[MLVector]("tfidfVec"))
}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
vecRdd.cache()
val mat = new RowMatrix(vecRdd)
val k = 500
val svd = mat.computeSVD(k,computeU=true)
import org.apache.spark.mllib.linalg.{Matrix,SingularValueDecomposition}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
def topTermsInTopConcepts(
svd: SingularValueDecomposition[RowMatrix,Matrix],
numConcepts: Int ,
numTerms: Int , termIds: Array[String])
: Seq[Seq[(String,Double)]] = {
val v = svd.V
val topTerms = new ArrayBuffer[Seq[(String,Double)]]()
val arr = v.toArray
for (i <- 0 until numConcepts) {
val offs = i * v.numRows
val termWeights = arr.slice(offs,offs + v.numRows).zipWithIndex
val sorted = termWeights.sortBy(-_._1)
topTerms += sorted.take(numTerms ).map {
case (score,id) =>( termIds(id), score)
}
}
topTerms
}
def topDocsInTopConcepts(
svd:SingularValueDecomposition[RowMatrix,Matrix],
numConcepts: Int , numDocs: Int,docIds: Map[Long,String])
: Seq[Seq[(String,Double)]] = {
val u = svd.U
val topDocs = new ArrayBuffer[Seq[(String,Double)]]()
for ( i <- 0 until numConcepts) {
val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId()
topDocs += docWeights.top(numDocs).map {
case (score,id) => (docIds(id),score)
}
}
topDocs
}
val topConceptTerms = topTermsInTopConcepts(svd ,6,10,termIds)
val topConceptDocs = topDocsInTopConcepts(svd ,6, 10,docIds)
for ((terms,docs) <- topConceptTerms.zip(topConceptDocs)){
println("Concept terms:" + terms.map(_._1).mkString(","))
println("Concept docs:" + docs.map(_._1).mkString("," ))
println()
}