掘金 后端 ( ) • 2024-04-24 15:37

前言

最近项目中有一个需求,某些表的数据 不是由系统业务产生的,而是有可能是从别的第三方系统抽取过来的。而这些数据可能量很大,所以Mysql只作为一个持久化存储,当系统中去查询这些数据时,希望从MongoDB中查询,这里就涉及到一个MySQL中数据实时迁移到MongoDB的问题,本文就来简单介绍一下SpringBoot整合MongoDB的步骤,以及MySQL数据实时迁移到MongoDB的一种方案。

SpringBoot整合MongoDB

依赖+配置文件

pom.xml

    <!--引入Mongodb-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>

application.yml

spring:
  data:
    mongodb:
      uri: mongodb://localhost:27017
      database: demo

实体类+Repository

创建文档对应的实体类,标注@Document,指定集合名称,id字段标注@Id

@Data
@Document("t_log")
public class LogEntity implements Serializable {
    private static final long serialVersionUID = 1L;

    @Id
    private String id;
    
    //省略其他字段...
}

创建Repository类,作用类似于service

public interface LogRepository extends MongoRepository<LogEntity, String> {
}

后续就可以在代码中注入LogRepository,调用它的方法操作MongoDB了。

数据迁移

数据迁移的方案如下:

  1. 在MySQL中,需要迁移数据的表中,添加触发器,当触发增删改三种操作时,将表名以及该条数据的主键记录到一个中间表。
  2. 项目启动后,起一个异步线程去轮询中间表,只要有数据,就根据表名和主键查询出该条数据,再根据操作的类型更新到MongoDB
  3. 上述操作完毕后,删除中间表的数据。

下面贴一下详细的步骤

触发器

中间表表结构如下:

image.png

在需要迁移数据的表中添加触发器:

image.png

事件监听器

监听上下文刷新事件,实现在项目启动之后,触发自定义逻辑

@Component
public class UploadService {

    @EventListener
    public void onEventListener(ContextRefreshedEvent event) {
        new Thread(() -> {
            while (true) {
                upload();
                //休眠200毫秒 避免cpu空转
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

轮询

@Component
public class UploadService {

    @Autowired
    private UploadLogService uploadLogService;

    @Autowired
    private LogService logService;

    @Autowired
    private LogRepository logRepository;
    
    private static final ThreadPoolTaskExecutor threadPool = UploadThreadPool.getTaskExecutor();

    private long maxId = 0;

    /**
     * 监听项目启动事件
     *
     * @param event
     */
    @EventListener
    public void onEventListener(ContextRefreshedEvent event) {
        new Thread(() -> {
            while (true) {
                upload();
                //休眠200毫秒 避免cpu空转
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }

    private void upload() {
        //获取最小id值
        UploadLogEntity one = uploadLogService.getOne(new QueryWrapper<UploadLogEntity>().orderByAsc("id").last("limit 1"));
        if (one == null) return;
        //赋值最小id值,取数根据id来取 >= id
        maxId = maxId > 0 ? maxId : one.getId();
        //查询数据
        List<UploadLogEntity> list = uploadLogService.list(new QueryWrapper<UploadLogEntity>().ge("id", maxId).last("limit 1000"));
        if (list.size() == 0) return;
        maxId = list.get(list.size() - 1).getId() + 1;
        //线程池异步迁移数据
        list.forEach(data -> threadPool.execute(() -> {
            Long id = data.getId();//中间表数据id
            String tableName = data.getTableName();//表名
            String keyValue = data.getKeyValue();//主键名
            String type = data.getType();

            if ("t_log".equals(tableName)) {
                if ("1".equals(type)) {
                    LogEntity entity = logService.getById(keyValue);
                    logRepository.save(entity);
                } else if ("2".equals(type)) {
                    logRepository.deleteById(keyValue);
                }
            } else if ("t_demo".equals(tableName)) {
                //依次类推,判断每个可能的表名,分别进行处理
            }
    }

    //单例线程池
    static class UploadThreadPool {
        private static ThreadPoolTaskExecutor taskExecutor = taskExecutor();

        private UploadThreadPool() {
        }

        public static ThreadPoolTaskExecutor getTaskExecutor() {
            return taskExecutor;
        }

        private static ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(20);//核心线程数
            executor.setMaxPoolSize(50);//最大线程数
            executor.setQueueCapacity(100);//等待队列长度
            executor.setThreadNamePrefix("upload-thread-");//线程名称前缀
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略
            executor.initialize();//线程池初始化
            return executor;
        }
    }
}

总结

以上,就完成了MySQL数据实时迁移MongoDB。