Word2Vec与文章相似度--相似度计算

2.7.4.2 相似度计算

  • 目的:计算18号Python频道的文章之间相似度
  • 步骤:
    • 1、读取数据,进行类型处理(数组到Vector)
    • 2、BRP进行FIT

读取数据,进行类型处理(数组到Vector)

from pyspark.ml.linalg import Vectors
# 选取部分数据做测试
article_vector = w2v.spark.sql("select article_id, articlevector from article_vector where channel_id=18 limit 10")
train = articlevector.select(['article_id', 'articleVector'])

def _array_to_vector(row):
    return row.article_id, Vectors.dense(row.articleVector)

train = train.rdd.map(_array_to_vector).toDF(['article_id', 'articleVector'])

BRP进行FIT

  • class pyspark.ml.feature.BucketedRandomProjectionLSH(inputCol=None, outputCol=None, seed=None, numHashTables=1, bucketLength=None)
    • inputCol=None:输入特征列
    • outputCol=None:输出特征列
    • numHashTables=1:哈希表数量,几个hash function对数据进行hash操作
    • bucketLength=None:桶的数量,值越大相同数据进入到同一个桶的概率越高
    • method:
      • approxSimilarityJoin(df1, df2, 2.0, distCol='EuclideanDistance')
      • 计算df1每个文章相似的df2数据集的数据
from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(inputCol='articleVector', outputCol='hashes', numHashTables=4.0, bucketLength=10.0)
model = brp.fit(train)

计算相似的文章以及相似度

similar = model.approxSimilarityJoin(test, train, 2.0, distCol='EuclideanDistance')

similar.sort(['EuclideanDistance']).show()

2.7.4.3 问题3

对于计算出来的相似度,是要在推荐的时候使用。那么我们所知的是,HIVE只适合在离线分析时候使用,因为运行速度慢,所以只能将相似度存储到HBASE当中

  • hbase

2.7.5 文章相似度存储

  • 目的:将所有文章对应相似度文章及其相似度保存

  • 步骤:

    • 调用foreachPartition
      • foreachPartition不同于map和mapPartition,主要用于离线分析之后的数据落地,如果想要返回新的一个数据DF,就使用map后者。

我们需要建立一个HBase存储文章相似度的表

create 'article_similar', 'similar'

# 存储格式如下:key:为article_id, 'similar:article_id', 结果为相似度
put 'article_similar', '1', 'similar:1', 0.2
put 'article_similar', '1', 'similar:2', 0.34
put 'article_similar', '1', 'similar:3', 0.267
put 'article_similar', '1', 'similar:4', 0.56
put 'article_similar', '1', 'similar:5', 0.7
put 'article_similar', '1', 'similar:6', 0.819
put 'article_similar', '1', 'similar:8', 0.28

定义保存HBASE函数,确保我们的happybase连接hbase启动成功,Thrift服务打开。hbase集群出现退出等问题常见原因,配置文件hadoop目录,地址等,还有

  • ntpdate 0.cn.pool.ntp.org或者ntpdate ntp1.aliyun.com
  • hbase-daemon.sh start thrift
def save_hbase(partition):
    import happybase
    pool = happybase.ConnectionPool(size=3, host='hadoop-master')

    with pool.connection() as conn:
        # 建议表的连接
        table = conn.table('article_similar')
        for row in partition:
            if row.datasetA.article_id == row.datasetB.article_id:
                pass
            else:
                table.put(str(row.datasetA.article_id).encode(),
                         {"similar:{}".format(row.datasetB.article_id).encode(): b'%0.4f' % (row.EuclideanDistance)})
        # 手动关闭所有的连接
        conn.close()

similar.foreachPartition(save_hbase)

版权声明:本文为weixin_43894652原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。