掘金 后端 ( ) • 2024-05-08 10:37

前言

作者本人是一名互联网公司工作的码农,之前在一家做社交App的创业公司担任技术负责人,在此期间从0到1搭建了一套基于ElasticSearch实现的实时推荐系统,在本文章中将介绍给各位童鞋,并欢迎各位童鞋吐槽和建议。

需求背景

我们需要实现一款基于人与人之间进行推荐的服务,来帮助用户快速匹配到自己喜欢的用户,帮助用户快速交友,消费主体是用户、被推荐的也是用户。在这种场景下更强调推荐的实时性,想象一下,如果给你推荐的用户离你比较近、当前正好在线,那么你和对方聊天的欲望是不是更高呢。

整体架构

使用ES来实现推荐,实际是利用了ES中的function_score query这个强大的功能,在function_score query中我们可以随意的使用我们自定义的function来计算最终的_score,这样我们就能把我们的推荐算法写在function中来得出推荐排序。
可参考官方文档对于function_score query的介绍: ElasticSearch Doc
基于这个思路,那么我们需要把我们推荐算法要用的指标、特征需要全部抽取到ES中,有实时的特征也有离线的特征,下面是整个工程的架构图

image.png

图中绿色线条表示实时特征数据(用户行为日志)采集和计算最终落地ElasticSearch,棕黄色的线条表示用户的基础信息,也就是用户的Profile资料,紫色的线条表示离线数仓的构建,数据经过ETL加工后最终得到的特征数据出仓进入ElasticSearch。

用户行为采集

OpenResty是一个基于 Nginx 的平台, 性能非常强劲,可以使用 Lua 脚本在 Nginx 中的运行实现各种功能, 可以通过lua语言实现在nginx中将用户行为发送到kafka。
使用OpenResty-lua-kafka搭建的用户行为日志采集服务可以参考我的另外一篇文章 # nginx+lua+kafka实现高性能用户行为打点服务
我们把用户行为日志简称为ubt-log

行为特征计算

假如我们的算法中需要使用一个特征指标——这个用户最近5分钟内的受欢迎程度, 它的计算方式为like_rate_last_5min = cnt_be_liked_last_5min / cnt_be_exposed_last_5min, 也就是5分钟内被其他用户喜欢的次数 / 5分钟内被曝光的次数
首先需要客户端在每次用户在首页被推荐曝光时和每次对其他用户点击喜欢的时候上报打点, event依次为user_exposeduser_like,点位数据包含当前用户ID以及被曝光/喜欢的用户ID,经过打点服务器后数据将会在kafka中,我们通过flink-kafka-connector消费kafka的数据,并使用time_window来分别统计每个用户5分钟内被曝光的次数和被喜欢的次数,也就是cnt_be_liked_last_5mincnt_be_exposed_last_5min, 然后加上当前计算时间的水印last_5min_window_ts, 将这3个字段的数据sink输出到ES中。

特征值算法函数

在ES中提供了默认的很多函数,我们也可以自定义我们的算法函数,在ES中使用painless语言来编写函数脚本,和Java很像,下面我给出该算法的实现(示例)。

GET /_search
{
  "query": {
    "function_score": {
      "script_score": {
        "script": {
          "source": """
          long currentTime = Instant.now().millis;
          long gap = currentTime - doc['last_5min_window_ts'].value;
          if (gap > 300000) {
              return 0.1;
          }
          long exp = doc['cnt_be_exposed_last_5min'].value;
          long likes = doc['cnt_be_liked_last_5min'].value;
          return (double)likes / exp ;
          """
        }
      }
    }
  }
}

用户Profile同步

首先我们需要借助alibaba开源的中间件canal来完成数据库binlog输出到kafka,然后使用flink来将数据增量的同步到ES,这里也可以使用flink-cdc的方式将mysql数据增量同步到ES,不过这里使用kafka来做数据中转的好处是离线数仓就不需要再同步一次了,后面会讲到。
用户的Profile中包含了用户当前最新的地理位置坐标,当然这需要客户端每次在启动App时上报给服务端,服务端更新mysql表中用户的地理位置坐标。
这样我们得到了每个用户的地理位置或者是生日这些基础字段,我们可以通过这两个字段写出对应的推荐函数。

GET /_search
{
  "query": {
    "function_score": {
      "functions": [
        {
          "exp": {
            "birthday": {
              "origin": "1994-10-10",
              "scale": "1y",
              "offset": "2y",
              "decay": 0.3
            }
          }
        },
        {
          "gauss": {
            "location": {
              "origin": "31.1976,121.482",
              "scale": "10km",
              "offset": "30km",
              "decay": 0.7
            }
          }
        }
      ],
      "score_mode": "multiply"
    }
  }
}

上面两个函数分别使用了expgauss两种衰减函数,这两种衰减函数在ElasticSearch中是默认内置的,可以看官方文档的介绍,这样就使用距离和年龄两个函数共同计算推荐排序。

image.png

离线数仓搭建

  1. 通过Serverless云函数(或者其他方式)消费Kakfa中的数据,比如ubt-logbinlog等,可以每消费1000条数据然后输出一个JSON文件上传到hdfs, 我这里用了hadoop-cos插件来将腾讯云对象存储COS作为HDFS文件系统,阿里云和AWS也都有相同的实现。注意每天使用一个新的文件夹,文件夹名称可以叫做date=20240101,这将作为一个hive表的分区。
  2. 搭建离线Hive数仓,可以参考apache-hadoopapache-hive的官方文档来部署Hadoop和Hive,或者在云厂商直接购买EMR产品,这样比较省心。
  3. 创建一个hive external table并将Location指向COS中的地址
use dw;
CREATE EXTERNAL TABLE `ods_ubt_log`
(
    `id`        string COMMENT 'from deserializer',
    `eventtype` string COMMENT 'from deserializer',
    `firedby`   string COMMENT 'from deserializer',
    `timestamp` bigint COMMENT 'from deserializer',
    `data`      struct<`_type`:string,`client_ip`:string,event:string,....> COMMENT 'from deserializer',
    `_type`     string COMMENT 'from deserializer'
)
PARTITIONED BY (`date` string)
ROW FORMAT SERDE  'org.openx.data.jsonserde.JsonSerDe'
STORED AS
    INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 'cosn://${bucket}/ubt-log';

添加Hive表的分区

ALTER TABLE dw.ods_ubt_log ADD IF NOT EXISTS PARTITION (`date`='20240101');

这样我们就把用户的行为日志全部落在了离线数仓,用同样的方法可以将binlog落地,然后从binlog中根据table加工分离出每张mysql表每天的全量数据,通过数据集成将所有数据落地到数仓后通过SparkSQL来加工数据表(ETL)和计算各种需要的特征指标,比如用户每天最常活跃的时间段、用户聊天平均回复的时长等,然后将特征数据也同步到ES中。

算法调参

真实场景下可能会有上百个函数共同计算推荐分数,很难衡量调整参数后我们的推荐算法是变得更好还是更坏,我们可以借助AB Test来调整算法,上线后通过分析实验对照、用户的整体like_rate、用户留存等各种指标来衡量决策算法/参数调整的效果。

其他

除了算法得到的推荐结果,很多场景下也有强逻辑的一些推荐,比如对方A喜欢了B,那么会用强逻辑将B优先推荐给A,以达到双方快速匹配的需求。本文中的介绍是整个推荐系统的核心,但也只是冰山一角,很多细节没有展现,比如还有召回、避免重复推荐、重回收、去中心化、预推荐、新用户保量等各种推荐策略组合,后续有时间我将推出专栏将每一个细节展现出来。

总结

本文讲述了基于ElasticSearch来实现实时推荐系统的一个落地的思路,和大厂的搜推工程相比是比较简单的、轻量级的,也不是一个千人千面的个性化推荐系统,但是在当你的App用户量还不是太多(比如DAU在数十万级别)这个阶段时,也能提供一个思路,和大厂的搜推工程相比,其实架子就已经在前期搭好了,数据的采集、数仓的构建这些都是相通的,后期用户量上来了再搭建搜推工程、特征平台就会简单很多,系统都是循序渐进的,在某个阶段都会有最适合当前阶段的架构,希望本文可以给你提供一些帮助。