掘金 后端 ( ) • 2024-04-25 17:05

theme: awesome-green highlight: atelier-sulphurpool-light

一、什么是Redis的缓存双写一致性

  • Redis中有数据,则必须保证和数据库中的数据是相同的

  • Redis中没有数据,则从数据库的查询最新的数据回写到Redis中

二、Redsi的缓存操作分类

2.1 只读缓存

Redis的只读缓存模式是一种缓存策略,其中数据首先从后端数据库加载到Redis缓存中。当进行读操作时,如果请求的数据在Redis缓存中命中(即存在),则直接返回缓存中的数据,不访问后端数据库。

如果Redis缓存不命中(即数据不存在于缓存中),则直接从后端数据库中读取数据,没有回写机制

2.2 读写缓存

Redis同时接受读和写请求。在读写缓存模式下,当业务应用进行数据访问时,它会首先查询Redis缓存中是否保存了相应的数据。

如果Redis缓存命中(即数据存在于缓存中),则对于读请求,它会直接从缓存中返回数据;对于写请求,它会更新缓存中的数据,并可能根据配置策略将更新后的数据异步写回后端数据库。这种策略可以减少对后端数据库的访问压力,提高系统的性能和响应速度。

如果Redis缓存不命中(即数据不存在于缓存中),则对于读请求,它会从后端数据库中读取数据,并将其加载到缓存中,然后再返回给请求方;对于写请求,它会将新数据写入后端数据库,并在缓存中创建或更新相应的数据项。

2.2.1 同步直写策略

  • 将数据写入数据库的同时也要更新Redis,保持Redis与数据库的内容是同步一致的;

2.2.2 异步缓写策略

  • 在系统运行期间,数据库的数据有所更新,但是允许Redis更新时间存在一定是误差

  • 如果更新失败,我们可以借助消息中间件来进行修补,重新写入数据到缓存中

2.2.3 代码实现

① 小厂,并发量很低 <=> qps很小


public Bottom findBottomById(Integer id) {
    final String CACHE_KEY_Bottom = "bottom:";
    Bottom bottom = null;
    String key = CACHE_KEY_Bottom + id;
    
    //1. 先查Redis,查不到再去查Mysql
    bottom = (Bottom) redisTemplate.opsForValue().get(key);

    if (bottom == null) {
        //2. 查询Mysql
        bottom = bottomMapper.selectByKey(id);
        if (bottom == null) { 
            return bottom;
        } else {
            //3. mysql有则回写到Redis
            redisTemplate.opsForValue().set(key, bottom);
        }
    }
    return bottom;
}

② 大厂 ,并发量很高 <=> qps很高

public Bottom findBottomById(Integer id) {

    final String CACHE_KEY_Bottom = "bottom:";
    Bottom bottom = null;
    String key = CACHE_KEY_Bottom + id;

    
    // 1. 先查Redis
    bottom = (Bottom) redisTemplate.opsForValue().get(key);
    if (bottom == null) {
        // 2. Redis为空,加锁再去查
        synchronized (UserService.class) {
            //3. 第2次查询redis
            bottom = (Bottom) redisTemplate.opsForValue().get(key);
            //4 . 第二次查询Redis为空,查询Mysql
            if (bottom == null) {
                //5. 查询Mysql
                bottom = bottomMapper.selectByKey(id);
                if (bottom == null) {
                    return null;
                } else {
                    //6. 查到数据,回写到Redis 
                    redisTemplate.opsForValue().setIfAbsent(key, bottom, 7L, TimeUnit.DAYS);
                }
            }
        }
    }
    return bottom;
}

三、缓存的更新策略

3.1 先更新数据库,再更新缓存

  • 回写Redis的时候可能出现失败,则出现Redis与Mysql数据不一致
  • 高并发情况下,线程的快慢不一样,导致Redis的回写时间不同造成Redis与Mysql数据不一致

3.2 先更新缓存,再更新数据库

  • 高并发情况下,线程的快慢不一样,导致Redis要写入Mysql不同步,也会导致数据不一致

3.3 先删除缓存,再更新数据库

  • 高并发的情况下,如果数据库更新失败或超时或返回不及时,就会导致后续的请求线程访问缓存时没有数据缓存缺失,没有命中马上去读my那么从数据库当中读了旧数据,导致数据不一致

解决思路:延迟双删

延迟双删的基本步骤如下:

  1. 删除缓存:当数据在数据库中更新后,首先立即删除缓存中的数据。
  2. 更新数据库:然后更新数据库中的数据。
  3. 设置睡眠时间:等待一段时间后(例如,几百毫秒),再次删除缓存中的数据。

睡眠时间的设定为:线程A的睡眠时间 大于线程B读取数据再写入缓存的时间

代码实现:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public Bottom findBottomById(Integer id) {
    final String CACHE_KEY_Bottom = "bottom:";
    Bottom bottom = null;
    String key = CACHE_KEY_Bottom + id;

    // 1. 从Redis中获取数据
    bottom = (Bottom) redisTemplate.opsForValue().get(key);
    if (bottom == null) {
        // 2. 如果Redis中没有数据,则加锁查询数据库
        synchronized (UserService.class) {
            // 再次从Redis中获取数据,防止在加锁期间其他线程已经更新了Redis
            bottom = (Bottom) redisTemplate.opsForValue().get(key);
            if (bottom == null) {
                // 3. 从数据库中查询数据
                bottom = bottomMapper.selectByKey(id);
                if (bottom != null) {
                    // 4. 将数据写入Redis
                    redisTemplate.opsForValue().set(key, bottom, 7L, TimeUnit.DAYS);
                    
                    // 5. 使用延迟队列或者定时任务来实现延迟删除
                    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
                    executorService.schedule(() -> {
                        // 在延迟一段时间后再次删除缓存
                        redisTemplate.delete(key);
                    }, 500, TimeUnit.MILLISECONDS); // 延迟500毫秒后删除
                }
            }
        }
    }
    return bottom;
}

3.4 先更新数据库,再删除缓存

  • 假如缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。

解决思路:使用消息中间件

  1. 更新数据库数据
  2. 数据库会将操作信息写入binlog日志当中
  3. 订阅程序提取出所需要的数据以及key
  4. 另起一段非业务代码,获得该信息
  5. 尝试删除缓存操作,发现删除失败
  6. 将这些信息发送至消息队列
  7. 重新从消息队列中获得该数据,重试操作。

四、canal的实战方案

4.1 何为Canal

Canal 是一个用于 MySQL 数据库变更数据捕获(Change Data Capture,简称 CDC)的开源工具,由阿里巴巴开发并开源。

其主要功能是监听 MySQL 的 binlog(二进制日志),解析 binlog 中的数据变更事件(如 INSERT、UPDATE、DELETE),并将这些事件发布到消息队列(如 Kafka)中,以便其他系统或应用能够实时消费这些变更数据。

4.2 Canal的工作原理

Canal的工作原理主要涉及以下几个步骤:

  1. 连接MySQL Master: Canal模拟作为MySQL的Slave,连接到MySQL Master上,并建立与Master的二进制日志(binlog)的订阅。

  2. 解析binlog: 当MySQL Master上的数据发生变化(INSERT、UPDATE、DELETE等)时,这些变更会写入到binlog中。Canal作为Slave连接到Master后,会读取Master上的binlog,并通过解析binlog来获取数据变更事件。

  3. 数据变更事件发布: Canal将解析得到的数据变更事件转换成标准化的格式,然后发布到消息队列(如Kafka)中。这样,其他系统或应用就可以订阅这些事件,并实时消费处理。

  4. 消费数据变更事件: 在你的业务场景中,可以有一个或多个消费者(Consumer)订阅Canal发布到Kafka的数据变更事件。当事件到达时,消费者会读取事件内容,并根据业务逻辑进行相应的处理。例如,在缓存更新场景中,消费者可以读取变更事件,并更新Redis中的缓存数据。

  5. 保持数据一致性: 通过实时监听和处理MySQL的数据变更事件,Canal能够帮助保持其他系统(如Redis)与MySQL的数据一致性。当MySQL中的数据发生变化时,通过Canal和消费者的协作,可以实时更新其他系统中的数据,确保数据的一致性。

4.3 Canal的代码实现

4.3.1 配置Mysql

①查看mysql版本

select VERSION()

image.png

②当前的主机二进制日志

show master status

image.png

③查看show variables like 'log_bin';

show variables like 'log_bin'

image.png

④开启 MySQL的binlog写入功能:修改mysql的配置文件mysql.cnf

image.png

log-bin=mysql-bin #开启 binlog

binlog-format=ROW #选择 ROW 模式

server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复 image.png

⑤重启mysql

docker restart 14a

⑥再次查看show variables like 'log_bin';

image.png

⑦授权canal连接MySQL账号

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'cancal';

FLUSH PRIVILEGES;

select host,user,plugin from mysql.user ;

image.png

4.3.2 配置Canal

1. 下载

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

image.png

2.解压

创建目录

mkdir /root/mycanal

将压缩包放入并解压

tar -zxvf

3.配置

修改/mycanal/conf/example路径下instance.properties文件

image.png

4.启动

centos需要先安装jdk1.8 并配置环境变量

cd /root/mycanal/bin/

./startup.sh

image.png

5.查看

①查看 server 日志

cd /root/mycanal/logs/canal/

cat canal.log

image.png

②查看 样例example 的日志

cd /root/mycanal/logs/example/

cat example.log

image.png

4.3.3 编写Java代码

① 修改pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.canal</groupId>
    <artifactId>canal_demo02</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.14</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.18</lombok.version>
        <mysql.version>5.1.47</mysql.version>
        <druid.version>1.1.16</druid.version>
        <mapper.version>4.1.5</mapper.version>
        <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
    </properties>

    <dependencies>
        <!--canal-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--SpringBoot通用依赖模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--SpringBoot与Redis整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!--SpringBoot与AOP-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
        <!--Mysql数据库驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!--SpringBoot集成druid连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>${druid.version}</version>
        </dependency>
        <!--mybatis和springboot整合-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>${mybatis.spring.boot.version}</version>
        </dependency>
        <!--通用基础配置junit/devtools/test/log4j/lombok/hutool-->
        <!--hutool-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <optional>true</optional>
        </dependency>
        <!--persistence-->
        <dependency>
            <groupId>javax.persistence</groupId>
            <artifactId>persistence-api</artifactId>
            <version>1.0.2</version>
        </dependency>
        <!--通用Mapper-->
        <dependency>
            <groupId>tk.mybatis</groupId>
            <artifactId>mapper</artifactId>
            <version>${mapper.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.8.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

②.修改yaml配置

spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/bottom?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.druid.test-while-idle=false

③ 注释掉主启动类的run

@SpringBootApplication
public class CanalTestApp
{
    public static void main(String[] args)
    {
        //SpringApplication.run(CanalTestApp.class,args);
    }
}

③ 编写一个RedisUtils

public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "192.168.118.130";
    public static final String  REDIS_pwd = "123456";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}

④ 编写CanalUntils工具类

public class CanalUntils
{
    public static final Integer _60SECONDS = 60;
    public static final String  REDIS_IP_ADDR = "192.168.111.185";

    private static void redisInsert(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("bigdata.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}

五、总结

本文主要介绍了Redis的缓存相关问题,什么是Redis的读写缓存,读写缓存会产生什么问题,以及如何解决这些问题