在开发C端产品时,用户行为上报是企业获取第一手用户信息的重要手段,对于提升产品质量、增强市场竞争力具有重要意义,通过分析用户在APP、小程序、WEB端等平台上报的行为数据,如浏览、点击、滑动等,产品经理可以了解用户对产品的使用习惯和偏好,进而优化产品功能和用户体验。分析用户行为数据能够帮助企业洞察市场趋势,了解用户需求,从而为产品迭代和市场策略的制定提供依据。同时基于用户行为数据,企业可以构建用户画像,实现精准营销和个性化推荐,提升用户满意度和忠诚度。
下面我们将通过使用 OpenResty
,一个基于 Nginx 的平台,可以更容易地实现 Lua 脚本在 Nginx 中的运行, 我们通过lua语言实现在nginx中将用户行为发送到kafka。
创建nginx配置目录./nginx/conf
, 在下面新建default.conf
和nginx.conf
server {
listen 80;
listen [::]:80;
server_name ${DOMAIN};
server_tokens off;
gzip on;
gzip_disable "msie6";
gzip_vary on;
gzip_proxied any;
gzip_comp_level 6;
gzip_buffers 16 8k;
gzip_http_version 1.1;
gzip_min_length 256;
gzip_types text/plain text/css application/json application/x-javascript text/xml application/xml application/xml+rss text/javascript application/vnd.ms-fontobject application/x-font-ttf font/opentype image/svg+xml image/x-icon;
charset utf-8;
root /usr/local/openresty/nginx/html;
index index.html;
location / {
index index.html;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/local/openresty/nginx/html;
}
location /status {
access_log off;
return 200 "UP\n";
}
location /post {
default_type 'text/plain';
lua_code_cache on;
client_max_body_size 2m;
client_body_buffer_size 2m;
client_body_in_single_buffer on;
client_body_in_file_only off;
limit_except POST {
deny all;
}
content_by_lua_block {
ngx.req.read_body()
if ngx.req.get_body_data() == nil then
ngx.exit(400)
end
ngx.exit(200)
}
log_by_lua_file /opt/openresty/lua/content-parser.lua;
}
}
# The `docker-openresty` file `nginx.vh.default.conf` is copied to
# `/etc/nginx/conf.d/default.conf`. It contains the `server section
# of the upstream `nginx.conf`.
#
# See https://github.com/openresty/docker-openresty/blob/master/README.md#nginx-config-files
#
#user nobody;
worker_processes 1;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 1024;
}
http {
include mime.types;
default_type application/octet-stream;
#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';
#access_log logs/access.log main;
sendfile on;
#tcp_nopush on;
#keepalive_timeout 0;
keepalive_timeout 65;
#gzip on;
include /etc/nginx/conf.d/*.conf;
}
创建目录nginx/lua
, 然后在目录下创建content-parser.lua
local zlib = require "zlib"
local cjson = require "cjson"
local kafka_producer = require "resty.kafka.producer"
local KAFKA_BROKERS = ${KAFKA_BROKERS}
local KAFKA_TOPIC = "${KAFKA_TOPIC}"
local KAFKA_PRODUCER_CONF = ${KAFKA_PRODUCER_CONF}
local function gunzip(data)
if data == nil then
return nil
end
local stream = zlib.inflate()
local callStatus, body = pcall(stream, data)
if callStatus then
return body
else
return nil
end
end
local function parse_headers(header_data)
local rt = {}
for k, v in pairs(header_data) do
local pv = v;
local pk = string.gsub(k, "-", "_")
if type(v) == "table" then
pv = cjson.encode(v)
end
local prefix = string.sub(pk, 1, 2)
if prefix == 'x_' then
pk = string.sub(pk, 3)
end
rt[pk]=pv;
end
rt['ip'] = header_data["X-REAL-IP"] or header_data["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
rt['start_time'] = ngx.req.start_time() * 1000
rt['server_time'] = ngx.now() * 1000
return rt
end
local function parse_lines(log_data)
local rt = {}
if log_data == nil or log_data == '' then
return rt
end
string.gsub(log_data, '[^\r\n]+', function(w) table.insert(rt, w) end)
return rt
end
local headers_data = parse_headers(ngx.req.get_headers())
local encoding = headers_data["content_encoding"]
local parsed_lines
if encoding == 'gzip' then
parsed_lines = parse_lines(gunzip(ngx.req.get_body_data()))
else
parsed_lines = parse_lines(ngx.req.get_body_data())
end
if nil == parsed_lines or 0 == #parsed_lines then
return
end
local p = kafka_producer:new(KAFKA_BROKERS, KAFKA_PRODUCER_CONF)
for _i = 1, #parsed_lines do
local ll = {c = parsed_lines[_i], f = 1, t = headers_data['server_time'], l = headers_data['server_time'], n = 'main', i = 1, m = true, h = headers_data}
local ok, err = p:send(KAFKA_TOPIC, nil, cjson.encode(ll))
if not ok then
ngx.log(ngx.ERR,"send kafka err:", err .. ", data:" .. ll)
end
end
在nginx/www
下创建nginx的错误页面50x.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>服务器遇到错误</title>
</head>
<body>
<h1>哎呀!</h1>
<p>服务器遇到了一个错误。请稍后再试或者<a href="mailto:[email protected]">联系我们</a>以获得帮助。</p>
</body>
</html>
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Welcome to My Nginx Server</title>
<style>
body {
width: 35em;
margin: 0 auto;
font-family: Arial, Sans-serif;
}
</style>
</head>
<body>
<h1>Welcome to My Nginx Server!</h1>
<p>If you see this message, your Nginx web server is up and running.</p>
<!-- 一些示例链接 -->
<ul>
<li><a href="https://www.nginx.com/" target="_blank">Nginx Official Website</a></li>
<li><a href="https://nginx.org/en/docs/" target="_blank">Nginx Official Documentation</a></li>
<li><a href="https://www.gnu.org/software/bash/" target="_blank">GNU Bash</a></li>
<li><a href="https://www.kernel.org/" target="_blank">The Linux Kernel</a></li>
</ul>
<hr>
<footer>
<p>Thank you for visiting.</p>
</footer>
</body>
</html>
在nginx
目录下创建docker-entrypoint.sh
#!/bin/sh
envsubst < /etc/nginx/conf.d/default.conf > /etc/nginx/conf.d/default.conf
envsubst < /opt/openresty/lua/content-parser.lua > /opt/openresty/lua/content-parser.lua
cat /etc/nginx/conf.d/default.conf
echo "start run openresty"
exec /usr/local/openresty/bin/openresty -g "daemon off;"
编写Dockerfile
#!/bin/bash -ilex
FROM openresty/openresty:alpine-fat
# https://github.com/openresty/docker-openresty/
# http://totogoo.com/article/143/nginx-lua-nsq.html
ARG LUA_KAFKA_VERSION="0.06"
LABEL lua_kafka_version="${LUA_KAFKA_VERSION}"
RUN echo $PATH \
&& whoami
RUN mkdir -p /opt/openresty/lua/ \
&& apk add --no-cache git mercurial \
&& apk add --no-cache zlib-dev \
&& luarocks install lua-zlib
WORKDIR /opt/openresty
RUN curl -fSL https://github.com/doujiang24/lua-resty-kafka/archive/v${LUA_KAFKA_VERSION}.tar.gz -o lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
&& tar xzf lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
&& rm -f lua-resty-kafka-${LUA_KAFKA_VERSION}.tar.gz \
&& mv lua-resty-kafka-${LUA_KAFKA_VERSION}/lib/resty/kafka /usr/local/openresty/lualib/resty/ \
&& rm -fr lua-resty-kafka-${LUA_KAFKA_VERSION} \
&& curl -fSL https://github.com/rainingmaster/lua-resty-nsq/archive/master.zip -o lua-resty-nsq-master.zip \
&& unzip lua-resty-nsq-master.zip \
&& rm -f lua-resty-nsq-master.zip \
&& mv lua-resty-nsq-master/lib/resty/nsq /usr/local/openresty/lualib/resty/ \
&& rm -fr lua-resty-nsq-master \
&& rm -f /usr/local/openresty/nginx/html/*
COPY nginx/conf/nginx.conf /usr/local/openresty/nginx/conf/
COPY nginx/conf/default.conf /etc/nginx/conf.d/
COPY nginx/lua /opt/openresty/lua/
ADD nginx/www /usr/local/openresty/nginx/html/
COPY nginx/docker-entrypoint.sh /opt/openresty/
ENV DOMAIN="my-domain.com"
ENV KAFKA_BROKERS="{{host="127.0.0.1",port=9092}}"
ENV KAFKA_TOPIC="ubt-log"
ENV KAFKA_PRODUCER_CONF="{ producer_type = "sync" }"
EXPOSE 80
ENTRYPOINT ["./docker-entrypoint.sh"]
客户端上报使用Http POST方法,json格式,地址为/post,kafka消息体中各字段含义如下
@Data
@EqualsAndHashCode
public class UbtLogEvent implements Serializable {
private static final long serialVersionUID = -5035290892623289431L;
/**
* 客户端的header(都是小写,而且横杆已经被替换成下划线)
*/
private Map<String, Object> h;
/**
* 客户端的线程, 实时打点都是main
*/
private String n;
/**
* 客户端线程号,当实时打点的时候,都是1
*/
private Integer i;
/**
* 是否是主线程日志, 当实时打点都是true
*/
private Boolean main;
/**
* 日志类型, 客户端的event打点都是1, 服务端模拟数据为2
*/
private Integer f;
/**
* 服务器端接收时间(毫秒)
*/
private Long t;
/**
* 客户端时间(毫秒), 实时的时候与t一样
*/
private Long l;
/**
* 内容, JSON字符串
*/
private String c;
}
完整代码请参考https://github.com/AOKI123/app-log-nginx