掘金 后端 ( ) • 2024-05-03 09:56

theme: condensed-night-purple highlight: androidstudio

主从复制原理

  1. 建立连接

    1. 从节点在配置了 replicaof 配置了主节点的ip和port
    2. 从库执行replicaof 并发送psync命令
  2. 同步数据到从库

    1. 主库bgsave生成RDB文件,并发送给从库,同时为每一个slave开辟一块 replication buffer 缓冲区记录从生成rdb文件开始收到的所有写命令。
    2. 从库清空数据并加载rdb
  3. 发送新写的命令给从库

    1. 从节点加载 RDB 完成后,主节点将 replication buffer 缓冲区的数据(增量写命令)发送到从节点,slave 接收并执行,从节点同步至主节点相同的状态。
  4. 基于长连接传播

    1. 方便后续命令传输,除了写命令,还维持着心跳机制

网络模型

  • 阻塞IO

  • 非阻塞IO

    • 没啥用,还是需要等待数据准备就绪
  • IO多路复用

  • 信号驱动IO

  • 异步IO

IO多路复用

文件描述符:简称FD

select

流程:

  1. 创建fd_set rfds

    1. 假设要监听的fd为1,2,5
  2. 执行select(5+1,null,null,3)

    1. 第一个参数为最大长度+1,后面是等待时间
  3. 内核遍历fd

    1. 没有数据,休眠

      1. 等待数据就绪或者超时
  4. 遍历fd_set 找到就绪的数据

存在的问题

  • 需要将整个fd_set从用户空间拷贝到内核空间,select结束后还要拷贝回用户空间
  • 需要遍历一次
  • 最大为监听1024

poll

将数据改为了链表,还是需要遍历

epoll

  1. epoll_create 创建epoll实例
  2. epoll_ctl 添加需要监听的fd,关联callback
  3. epoll_wait 等待fd就绪

epoll_wait有两种通知模式

  • levelTriggered 简称LT 当FD有数据可读的时候,会重复通知多次,直到数据处理完成。是epoll的默认模式
  • EdgeTriggered 简称ET 。当FD有数据可读的时候只会被通知一次,不管数据是否处理完成

ET模式避免了LT的惊群效应。

ET模式最好结合非阻塞IO。

附带解析失败的tcp连接redis客户端 使用go编写的


package main

import (
    "bufio"
    "container/list"
    "errors"
    "fmt"
    "io"
    "net"
    "strconv"
)

type RedisClient struct {
    conn   net.Conn
    writer *bufio.Writer
    reader *bufio.Reader
}

func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:6379")
    if err != nil {
       fmt.Println("连接redis失败", err)
    }
    client := RedisClient{conn: conn, writer: bufio.NewWriter(conn), reader: bufio.NewReader(conn)}

    client.sendRequest([]string{"set", "name", "方块"})
    //zrange boards:2024-4 0 -1
    client.sendRequest([]string{"zrange", "boards:2024-4", "0", "-1"})
    //var buffer [1024]byte
    //n, _ := conn.ReadToEnd(buffer[:])
    //fmt.Println(buffer[:n])
}

func (client *RedisClient) sendRequest(args []string) {

    length := len(args)
    firstCommand := fmt.Sprintf("%s%d", "*", length)
    client.writeCommand(firstCommand)
    for _, s := range args {
       n := len(s)
       client.writeCommand("$" + strconv.Itoa(n))
       client.writeCommand(s)
       println(n, s)
    }
    response := client.handleResponse()
    fmt.Printf("%v", response)

}
func (client *RedisClient) writeCommand(s string) {
    client.conn.Write([]byte(s + "\r\n"))

}
func (client *RedisClient) handleResponse() interface{} {

    //先读取第一个字符
    r, _, _ := client.reader.ReadRune()
    flag := string(r)
    fmt.Println("第一个操作数:" + flag)
    switch flag {
    case "+":
       //一行字符串
       return client.ReadLine()
    case "-":
       //异常
       return client.ReadLine()
    case ":":
       //数字
       line := client.ReadLine()
       res, _ := strconv.Atoi(line)
       return res
    case "$":
       // 多行字符串
       readRune, _, _ := client.reader.ReadRune()
       length := string(readRune)
       if length == "-1" {
          return nil
       } else if length == "0" {
          return ""
       }
       lll, _ := strconv.Atoi(length)
       bytes := make([]byte, lll)
       n, _ := client.reader.Read(bytes)
       return string(bytes[:n])
    case "*":
       //多行字符串
       return client.readBulkString()
    default:
       return errors.New("错误")
    }
}
func (client *RedisClient) ReadLine() string {
    bytes, _, _ := client.reader.ReadLine()
    return string(bytes)
}

func (client *RedisClient) ReadToEnd() string {
    var size = 1024
    bytes := make([]byte, size)
    var temp = ""
    for {
       n, err := client.reader.Read(bytes)
       if err == io.EOF || n == 0 {
          break
       }
       temp += string(bytes[:n])
    }

    return temp
}
func (client *RedisClient) readBulkString() interface{} {
    end := client.ReadToEnd()
    fmt.Println(end)
    counts, _ := strconv.Atoi(client.ReadLine())
    if counts <= 0 {
       return nil
    }
    var resList = list.List{}
    for i := 0; i < counts; i++ {
       res := client.handleResponse()
       resList.PushBack(res)
    }
    return resList

}