掘金 后端 ( ) • 2024-05-04 10:25

在开发C端产品时,用户行为上报是企业获取第一手用户信息的重要手段,对于提升产品质量、增强市场竞争力具有重要意义,通过分析用户在APP、小程序、WEB端等平台上报的行为数据,如浏览、点击、滑动等,产品经理可以了解用户对产品的使用习惯和偏好,进而优化产品功能和用户体验。分析用户行为数据能够帮助企业洞察市场趋势,了解用户需求,从而为产品迭代和市场策略的制定提供依据。同时基于用户行为数据,企业可以构建用户画像,实现精准营销和个性化推荐,提升用户满意度和忠诚度。

下面我们将通过使用 OpenResty,一个基于 Nginx 的平台,可以更容易地实现 Lua 脚本在 Nginx 中的运行, 我们通过lua语言实现在nginx中将用户行为发送到kafka。

创建nginx配置目录./nginx/conf, 在下面新建default.confnginx.conf

server {

  listen 80;
  listen [::]:80;

  server_name ${DOMAIN};
  server_tokens off;
  gzip on;
  gzip_disable "msie6";

  gzip_vary on;
  gzip_proxied any;
  gzip_comp_level 6;
  gzip_buffers 16 8k;
  gzip_http_version 1.1;
  gzip_min_length 256;
  gzip_types text/plain text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript application/vnd.ms-fontobject application/x-font-ttf font/opentype image/svg+xml image/x-icon;

  charset utf-8;
  root /usr/local/openresty/nginx/html;

  index index.html;

  location / {
    index index.html;
  }

  error_page   500 502 503 504  /50x.html;
  location = /50x.html {
    root   /usr/local/openresty/nginx/html;
  }

  location /status {
    access_log off;
    return 200 "UP\n";
  }

  location /post {
    default_type 'text/plain';
    lua_code_cache on;
    client_max_body_size 2m;
    client_body_buffer_size 2m;
    client_body_in_single_buffer on;
    client_body_in_file_only off;
    limit_except POST {
      deny  all;
    }
    content_by_lua_block {
      ngx.req.read_body()
      if ngx.req.get_body_data() == nil then
      ngx.exit(400)
      end
      ngx.exit(200)
    }
    log_by_lua_file /opt/openresty/lua/content-parser.lua;
  }
}
# The `docker-openresty` file `nginx.vh.default.conf` is copied to
# `/etc/nginx/conf.d/default.conf`.  It contains the `server section
# of the upstream `nginx.conf`.
#
# See https://github.com/openresty/docker-openresty/blob/master/README.md#nginx-config-files
#

#user  nobody;
worker_processes  1;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;


events {
  worker_connections  1024;
}


http {
  include       mime.types;
  default_type  application/octet-stream;

  #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
  #                  '$status $body_bytes_sent "$http_referer" '
  #                  '"$http_user_agent" "$http_x_forwarded_for"';

  #access_log  logs/access.log  main;

  sendfile        on;
  #tcp_nopush     on;

  #keepalive_timeout  0;
  keepalive_timeout  65;

  #gzip  on;

  include /etc/nginx/conf.d/*.conf;
}

创建目录nginx/lua, 然后在目录下创建content-parser.lua

local zlib = require "zlib"
local cjson = require "cjson"
local kafka_producer = require "resty.kafka.producer"

local KAFKA_BROKERS = ${KAFKA_BROKERS}
local KAFKA_TOPIC = "${KAFKA_TOPIC}"
local KAFKA_PRODUCER_CONF = ${KAFKA_PRODUCER_CONF}

local function gunzip(data)
    if data == nil then
        return nil
    end
    local stream = zlib.inflate()
    local callStatus, body = pcall(stream, data)
    if callStatus then
        return body
    else
        return nil
    end
end

local function parse_headers(header_data)
    local rt = {}
    for k, v in pairs(header_data) do
        local pv = v;
        local pk = string.gsub(k, "-", "_")
        if type(v) == "table" then
            pv = cjson.encode(v)
        end
        local prefix = string.sub(pk, 1, 2)
        if prefix == 'x_' then
            pk = string.sub(pk, 3)
        end
        rt[pk]=pv;
    end
    rt['ip'] = header_data["X-REAL-IP"] or header_data["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
    rt['start_time'] = ngx.req.start_time() * 1000
    rt['server_time'] = ngx.now() * 1000
    return rt
end

local function parse_lines(log_data)
    local rt = {}
    if log_data == nil or log_data == '' then
        return rt
    end
    string.gsub(log_data, '[^\r\n]+', function(w) table.insert(rt, w) end)
    return rt
end

local headers_data = parse_headers(ngx.req.get_headers())

local encoding = headers_data["content_encoding"]
local parsed_lines
if encoding == 'gzip' then
    parsed_lines = parse_lines(gunzip(ngx.req.get_body_data()))
else
    parsed_lines = parse_lines(ngx.req.get_body_data())
end

if nil == parsed_lines or 0 == #parsed_lines then
    return
end

local p = kafka_producer:new(KAFKA_BROKERS, KAFKA_PRODUCER_CONF)

for _i = 1, #parsed_lines do
    local ll = {c = parsed_lines[_i], f = 1, t = headers_data['server_time'], l = headers_data['server_time'], n = 'main', i = 1, m = true, h = headers_data}
    local ok, err = p:send(KAFKA_TOPIC, nil, cjson.encode(ll))
    if not ok then
        ngx.log(ngx.ERR,"send kafka err:", err .. ", data:" .. ll)
    end
end

nginx/www下创建nginx的错误页面50x.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>服务器遇到错误</title>
</head>
<body>
    <h1>哎呀!</h1>
    <p>服务器遇到了一个错误。请稍后再试或者<a href="mailto:[email protected]">联系我们</a>以获得帮助。</p>
</body>
</html>

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Welcome to My Nginx Server</title>
    <style>
        body {
            width: 35em;
            margin: 0 auto;
            font-family: Arial, Sans-serif;
        }
    </style>
</head>
<body>
    <h1>Welcome to My Nginx Server!</h1>
    <p>If you see this message, your Nginx web server is up and running.</p>
    <!-- 一些示例链接 -->
    <ul>
        <li><a href="https://www.nginx.com/" target="_blank">Nginx Official Website</a></li>
        <li><a href="https://nginx.org/en/docs/" target="_blank">Nginx Official Documentation</a></li>
        <li><a href="https://www.gnu.org/software/bash/" target="_blank">GNU Bash</a></li>
        <li><a href="https://www.kernel.org/" target="_blank">The Linux Kernel</a></li>
    </ul>
    <hr>
    <footer>
        <p>Thank you for visiting.</p>
    </footer>
</body>
</html>

nginx目录下创建docker-entrypoint.sh

#!/bin/sh

envsubst < /etc/nginx/conf.d/default.conf > /etc/nginx/conf.d/default.conf
envsubst < /opt/openresty/lua/content-parser.lua > /opt/openresty/lua/content-parser.lua

cat /etc/nginx/conf.d/default.conf

echo "start run openresty"

exec /usr/local/openresty/bin/openresty -g "daemon off;"

编写Dockerfile

#!/bin/bash -ilex
FROM openresty/openresty:alpine-fat

# https://github.com/openresty/docker-openresty/
# http://totogoo.com/article/143/nginx-lua-nsq.html

ARG LUA_KAFKA_VERSION="0.06"

LABEL lua_kafka_version="${LUA_KAFKA_VERSION}"

RUN echo $PATH \
    && whoami

RUN mkdir -p /opt/openresty/lua/ \
    && apk add --no-cache git mercurial \
    && apk add --no-cache zlib-dev \
    && luarocks install lua-zlib

WORKDIR /opt/openresty

RUN curl -fSL https://github.com/doujiang24/lua-resty-kafka/archive/v${LUA_KAFKA_VERSION}.tar.gz -o lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
    && tar xzf lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
    && rm -f lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
    && mv lua-resty-kafka-${LUA_KAFKA_VERSION}/lib/resty/kafka /usr/local/openresty/lualib/resty/ \
    && rm -fr lua-resty-kafka-${LUA_KAFKA_VERSION} \
    && curl -fSL https://github.com/rainingmaster/lua-resty-nsq/archive/master.zip -o lua-resty-nsq-master.zip \
    && unzip lua-resty-nsq-master.zip \
    && rm -f lua-resty-nsq-master.zip \
    && mv lua-resty-nsq-master/lib/resty/nsq /usr/local/openresty/lualib/resty/ \
    && rm -fr lua-resty-nsq-master \
    && rm -f /usr/local/openresty/nginx/html/*

COPY nginx/conf/nginx.conf /usr/local/openresty/nginx/conf/
COPY nginx/conf/default.conf /etc/nginx/conf.d/
COPY nginx/lua /opt/openresty/lua/
ADD nginx/www /usr/local/openresty/nginx/html/

COPY nginx/docker-entrypoint.sh /opt/openresty/

ENV DOMAIN="my-domain.com"
ENV KAFKA_BROKERS="{{host="127.0.0.1",port=9092}}"
ENV KAFKA_TOPIC="ubt-log"
ENV KAFKA_PRODUCER_CONF="{ producer_type = "sync" }"



EXPOSE 80
ENTRYPOINT ["./docker-entrypoint.sh"]

客户端上报使用Http POST方法,json格式,地址为/post,kafka消息体中各字段含义如下

@Data
@EqualsAndHashCode
public class UbtLogEvent implements Serializable {

    private static final long serialVersionUID = -5035290892623289431L;

    /**
     * 客户端的header(都是小写,而且横杆已经被替换成下划线)
     */
    private Map<String, Object> h;

    /**
     * 客户端的线程, 实时打点都是main
     */
    private String n;

    /**
     * 客户端线程号,当实时打点的时候,都是1
     */
    private Integer i;

    /**
     * 是否是主线程日志, 当实时打点都是true
     */
    private Boolean main;

    /**
     * 日志类型, 客户端的event打点都是1, 服务端模拟数据为2
     */
    private Integer f;

    /**
     * 服务器端接收时间(毫秒)
     */
    private Long t;

    /**
     * 客户端时间(毫秒), 实时的时候与t一样
     */
    private Long l;

    /**
     * 内容, JSON字符串
     */
    private String c;
}

完整代码请参考https://github.com/AOKI123/app-log-nginx