掘金 后端 ( ) • 2024-04-15 17:39

一、背景

随着业务的不断扩展,洞窝积累了大量的用户数据。这些数据主要包括用户的会员数据、用户行为埋点数据以及基于服务端埋点的业务数据。为了统一管理这些数据,更好的分析了解用户,以及基于数据分析对用户精准触达和实时营销,我们建设了洞窝智能营销系统。

二、整体架构

架构.png

三、全域数据接入

洞窝智能营销平台目前接入的数据源包括mysql、doris、oracle、hive等离线数据源和kafka、rocketmq实时数据源。

3.1 外部数据源动态接入技术方案

用户提供外部数据源的数据库地址、端口号、用户名、密码等信息,DMA就会尝试连接到定义的数据库,如果连接成功就会加入到内部维护的数据源列表中,根据不同的数据库类型获取数据表列表、数据表的元数据信息,使用户感觉像连接本地数据库一样方便。 DMA使用mybatisplus的dynamic-datasource插件来实现这一功能。 动态数据源源码.png 引入dynamic-datasource-spring-boot-starter依赖后,我们发现DynamicRoutingDataSource这一核心类已经注册为Spring Bean,所以在我们的业务里直接注入即可。通过动态数据源的业务ID为key。 当我们需要使用某一外部数据源时,我们可以根据key来切换数据源,代码如下: 动态切换数据源源码.png 这里有一个问题,当我们手动切换数据源时,我们需要手动清除掉数据源标识,因为这个数据源标识是存在线程上下文中的。如果发生异常时,线程被重复利用,那么会导致数据源混乱。

3.2 流式读取数据源

一般DMA处理的离线数据数据量在百万级,所以我们在读取外部数据源数据时,需要使用流的方式,避免一次性加载过多数据导致程序内存溢出。代码如下: 流式读取数据源源码.png

四、ID Mapping

前面说到我们的数据来自于多个业务,那么我们就需要使用ID Mapping技术整合多种ID类型,形成统一的用户ID(did),打破数据孤岛,这是提供更全面更准确用户触达的基础。 IDMapping生成的did与各实体Id的对应关系保存在redis中,都采用keyvalue数据格式存储。 一方面要根据实体ID找到对应的did(用于IDMapping场景),一方面要根据did找到对应的实体ID(用于营销触达等场景) 因此要存储两类映射: 1、did -> 实体ID 格式为:dma:did:xxx:idxx -> xxx 2、实体ID -> did 格式为:dma:idxx:xxx -> xxx ID Mapping流程.png 代码如下:

private Long findDid(Long mappingIdId, String mappingIdValue, Map otherIdValue) {
    String mainKey = String.format(DataCenterConst.DMA_OTHER_ID_KEY, mappingIdId, mappingIdValue);
    String mainLockName = String.format(DataCenterConst.DMA_MAPPING_LOCK, mappingIdValue);
    RLock mainLock = redissonClient.getLock(mainLockName);
    mainLock.lock();
    Long did;
    //查redis
    try {
        RBucket mainBucket = redissonClient.getBucket(mainKey);
        did = mainBucket.get();
        if (did != null) {
            otherIdValue.put(new IdType(mappingIdId), mappingIdValue);
            saveMapping(did, otherIdValue);
        }
    } finally {
        mainLock.unlock();
    }
    if (did != null) {
        return did;
    }
    String lockName = String.format(DataCenterConst.DMA_MAPPING_LOCK, "null");
    RLock lock = redissonClient.getLock(lockName);
    lock.lock();
    try {
        RBuckets buckets = redissonClient.getBuckets();
        String[] keys = otherIdValue.entrySet().stream().map(item -> String.format(DataCenterConst.DMA_OTHER_ID_KEY, item.getKey(), item.getValue())).toArray(String[]::new);
        Map dids = buckets.get(keys);
        if (dids == null || dids.size() == 0) {
            did = incrementAndGetDid();
            log.info("otherIdValue : {} ===>> did : {}", otherIdValue, did);
        } else {
            Long tempDid = null;
            Set values = new HashSet<>(dids.values());
            if (values.size() == 1) {
                tempDid = new ArrayList<>(values).get(0);
            } else {
                for (Map.Entry en : otherIdValue.entrySet()) {
                    String k = String.format(DataCenterConst.DMA_OTHER_ID_KEY, en.getKey().getId(), en.getValue());
                    if (dids.containsKey(k)) {
                        tempDid = dids.get(k);
                        break;
                    }
                }
            }
            RBucket bucket = redissonClient.getBucket(String.format(DataCenterConst.DMA_DID_KEY, tempDid, mappingIdId));
            String mId = bucket.get();
            if (mId != null && !mId.equals(mappingIdValue)) {
                did = incrementAndGetDid();
                log.info("tempDid : {}, mainId: {} ===>> did : {}", tempDid, mId, did);
            } else {
                did = tempDid;
            }
        }
        otherIdValue.put(new IdType(mappingIdId), mappingIdValue);
        saveMapping(did, otherIdValue);
    } finally {
        lock.unlock();
    }
    return did;
}

五、标签圈选与人群预估

很多时候,运营人员和数据分析师为了达到更好的用户触达效果,需要不断优化人群的圈选范围。因此我们提供了自助圈选人群的功能,系统使用者使用系统提供的标签和用户行为,对各种人群进行交并差计算来圈选出理想的人群。 为了实现秒级响应人群预估量,我们使用doris的bitmap数据结构。通过解析,将人群规则转化为底层doris bitmap的交并差集计算,效率非常高。 人群圈选.png

六、用户画像

用户画像是对现实世界中用户的建模,可以把用户的标签汇总起来,更好的分析洞察用户。我们使用doris的聚合模型来实现用户画像功能。我们建立了一个用户画像宽表,当标签计算完成后触发用户画像宽表的更新,利用doris聚合模型replace_if_not_null的特性来实现部分列更新,代码如下:

<insert id="insertCustomLabel">
    insert into ${tableName} (did, c_${id}) select did,label_value from custom_label_summary lateral view explode_bitmap(did_bitmap) tmp AS did where label_id=#{id} and compute_time=#{computeTime}
</insert>

七、任务编排

DMA有大量的标签、人群计算任务,这些任务都是需要定时计算的,并且有些任务是有依赖的,因此我们需要引入调度框架,并且要实现任务的简单编排。经过调研,我们选择了quartz调度框架。quartz是一款开源且具有丰富特性的任务调度库,它可以基于mysql来实现分布式调度。 在洞窝智能营销平台中,下游任务依赖于上游任务,当一个下游任务依赖的上游任务都没有更新时,下游任务的执行结果和上一次是完全相同的,也就没有必要让该任务执行。因此在任务执行前要判断该任务的前置任务是否更新过。 任务依赖.png 如上图,task4依赖task1、task2、task3,2:00分task4被执行,2:05分task3被执行,当2:10分task4再次被调度时,发现上游任务task3执行过(2:05 > 2:00),那么task4就需要执行。

public abstract class DependExecutor extends IExecutor {
    @Override
    public boolean needExecute(ScheduleTask task, ScheduleContext sc, LocalDateTime startTime) {
        //手动调度直接执行
        if (sc.getTriggerType() == 3) {
            return true;
        }
        List dependTasks = scheduleTaskManager.getDependTasks(task.getId());
        //没有依赖直接执行
        if (dependTasks.size() == 0) {
            return true;
        }
        //没有执行过直接执行
        if (task.getLastExecuteTime() == null) {
            return true;
        }
        List updatedTasks = dependTasks.stream().filter(item -> item.getLastExecuteTime() != null && item.getLastExecuteTime().isAfter(task.getLastExecuteTime())).collect(Collectors.toList());
        if (updatedTasks.size() != 0) {
            return true;
        } else {
            return false;
        }
    }
}

八、多租户

作为一个SAAS平台,我们采用多租户共享模式架构来实现租户之间的隔离。我们选择使用mybatisplus的多租户插件来实现SQL解析,为每个SQL加上租户ID作为过滤条件,从而避免查询出不同租户的数据,同时避免了手动涉足过滤条件的重复劳动。 首先我们创建一个TenantLineHandler,在这个处理器里我们指定了哪些表可以忽略租户ID,并且指定了如何获取租户ID。

public class DmaTenantLineHandler implements TenantLineHandler {

    private List<String> tables;
    private List<String> patternTables;

    public DmaTenantLineHandler(List<String> tables, List<String> patternTables) {
        this.tables = tables;
        this.patternTables = patternTables;
    }

    @Override
    public boolean ignoreTable(String tableName) {
        boolean result = tables.stream().anyMatch(table -> table.equalsIgnoreCase(tableName));
        if (!result) {
            result = patternTables.stream().anyMatch(pattern -> {
                Pattern p = Pattern.compile(pattern);
                Matcher matcher = p.matcher(tableName);
                return matcher.matches();
            });
        }

        return result;
    }

    @Override
    public Expression getTenantId() {
        SaasUser saasUser = SaasUserContext.getSaasUser();
        return new LongValue(saasUser == null ? 0L : saasUser.getTenantId());
    }
}

然后,我们添加上多租户拦截器

@Configuration
public class MyBatisConfig {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor(TenantLineHandler tenantHandler){
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(tenantHandler));
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor());
        return interceptor;
    }

    @Bean
    public TenantLineHandler tenantLineHandler() {
        List<String> tables = new ArrayList<>();
        tables.add("easy_tenant");
        tables.add("easy_menu");
        tables.add("easy_menu_operate");

        tables.add("easy_outbound_city");

        List<String> pattionTables = new ArrayList<>();
        pattionTables.add("wx_.+");
        return new DmaTenantLineHandler(tables, pattionTables);
    }
}

上文简单介绍了洞窝智能营销平台,并且列举了在建设过程中一些实践。目前洞窝智能营销平台已经上线运行大半年了,使运营可以更全面准确的分析洞察用户,更精准的触达运营客户,为用户增长业务提供了很大的帮助。