掘金 后端 ( ) • 2024-03-27 17:13

  分布式系统需要多个节点协同来完成一个特定的任务,为了保证节点之间协同的高效性以及整个系统运行的稳定性,需要有一个 leader 节点进行决策。同时,考虑到分布式系统对分区容错的要求,在检测到当前 leader 节点不可达时,也需要选举新的 leader 节点。

⒈ Bully 算法的运行机制

  Bully 算法以其简单、高效而常用于小规模分布式系统的 leader 节点选举。在系统中,每一个节点都会分配一个唯一的 ID 来标识节点的优先级,优先级最高的节点会被选举为 leader 节点。

  系统中的其他节点会定期检测当前 leader 节点的状态,如果某个节点检测到当前的 leader 节点没有响应,则该节点会发起新的 leader 节点选举过程。

  在新发起的 leader 节点选举过程中,该节点只会向优先级高于自身的节点发送 leader 节点选举消息。如果这些高优先级的节点都没有响应选举消息,那么该节点会被选举为新的 leader 节点,并向系统中其他节点广播该消息。如果有高优先级的节点响应了选举消息,那么高优先级的节点会向比自身优先级更高的节点重新发送选举 leader 节点的消息,循环往复,直至选举出新的 leader 节点。

  Bully 算法中的消息类型

  Bully 算法中主要有三种消息类型,其作用如下:

  • Election 消息:发起选举新 leader 节点的消息
  • Alive 消息:处于活跃状态的高优先级节点对 Election 消息的响应
  • Elected 消息:新选出的 leader 节点对其他节点的广播消息

⒉ Bully 算法的工作流程

  假设在分布式系统中有 5 个节点,ID 分别为 n₁、n₂、n₃、n₄、n₅,默认情况下,优先级最高的 n₅ 节点被选举为 leader 节点。

image.png

  在某一时刻,n₂ 节点向 n₅ 节点发送请求,检测当前 leader 节点的状态,而 n₅ 节点没有响应。此时 n₂ 节点会向优先级高于自身的 n₃、n₄、n₅ 节点发送 Election 消息选举新的 leader 节点。

image.png

  此时,如果 n₃、n₄、n₅ 节点都没有响应 n₂ 节点的 Election 消息,则 n₂ 节点会被选举为新的 leader 节点;如果有节点响应 n₂ 节点的 Eleciton 消息,则 n₂ 节点的工作到此结束,响应 n₂ 节点的节点会继续进行新的 leader 节点的选举。

image.png

  由于 n₅ 节点已经没有响应,所以新的 leader 节点选举会在 n₃ 和 n₄ 节点之间进行。而 n₄ 节点的优先级高于 n₃ 节点,所以最终 n₄ 节点被选为新的 leader 节点。

image.png image.png

  n₄ 节点被选举为新的 leader 节点之后,会在系统中进行广播,各个节点也会相应的更新最新的 leader 节点信息。

image.png

  如果某一时刻,n₅ 节点重启,那么此时又会触发新的一轮 leader 节点选举过程,最终 n₅ 节点又会重新成为新的 leader 节点。

image.png

⒊ Bully 算法的代码实现

  首先定义程序中需要用到的消息结构体以及消息类型。

// message.go
package main

type Message struct {
   From        string
   Type        int
}

const (
   Ping    = iota
   Pong
   Election
   Alive
   Elected
)

  要进行 leader 节点选举,需要在各节点之间通信,所以各节点相互之间需要建立连接,并且每个节点都需要记录与其他节点之间的连接信息。另外,非 leader 节点还需要对 leader 节点进行周期性的心跳检测,以便在当前 leader 节点失效时能够及时发现,并重新进行新的 leader 节点选举。

// node.go
package main

import (
   "log"
   "net"
   "net/rpc"
   "strings"
   "sync"
   "time"
)

// 各节点以及监听地址的映射信息
var nodeMap = map[string]string{
   "node1":    "127.0.0.1:8081",
   "node2":    "127.0.0.1:8082",
   "node3":    "127.0.0.1:8083",
   "node4":    "127.0.0.1:8084",
   "node5":    "127.0.0.1:8085",
}

// 节点信息
type Node struct {
   Id          string
   Addr        string
   LeaderId    string
   RpcClient   *rpc.Client
   Siblings    *Siblings
}

// 系统中其他节点的信息
type Siblings struct {
   Nodes       map[string]*Node
   *sync.RWMutex
}
/*********** Siblings method start ***********/
func newSiblings() *Siblings {
   siblings := &Siblings{
      Nodes:   make(map[string]*Node),
      RWMutex: &sync.RWMutex{},
   }

   return siblings
}

func (siblings *Siblings) addSibling(nodeId string, rpcClient *rpc.Client) {
   siblings.Lock()
   defer siblings.Unlock()

   siblings.Nodes[nodeId] = &Node{
      Id:        nodeId,
      RpcClient: rpcClient,
   }
}

func (siblings *Siblings) getSibling(nodeId string) *Node {
   siblings.Lock()
   defer siblings.Unlock()

   return siblings.Nodes[nodeId]
}

func (siblings *Siblings) deleteSibling(nodeId string) {
   siblings.Lock()
   defer siblings.Unlock()

   delete(siblings.Nodes, nodeId)
}

func (siblings *Siblings) ListSiblings() string {
   siblings.Lock()
   defer siblings.Unlock()

   nodeIds := make([]string, 0, len(siblings.Nodes))

   for nodeId := range siblings.Nodes {
      nodeIds = append(nodeIds, nodeId)
   }

   return strings.Join(nodeIds, ", ")
}

func (siblings *Siblings) sliceSiblings() []*Node {
   siblings.Lock()
   defer siblings.Unlock()

   nodes := make([]*Node, 0, len(siblings.Nodes))

   for _, node := range siblings.Nodes {
      nodes = append(nodes, node)
   }

   return nodes
}
/*********** Siblings method end ***********/

// 创建新的节点
func NewNode(id string) *Node {
   node := &Node{
      Id:        id,
      Addr:      nodeMap[id],
      RpcClient: nil,
      Siblings:  nil,
   }

   node.Siblings = newSiblings()

   return node
}

// 监听当前节点
func (node *Node) NewListener() (net.Listener, error) {
   listener, err := net.Listen("tcp", node.Addr)

   return listener, err
}

// 连接系统中其他节点
func (node *Node) ConnectToSiblings() {
   for nodeId, nodeAddr := range nodeMap {
      if nodeId == node.Id {
         // 跳过与自身的连接
         continue
      }

      rpcClient := node.connectToSibling(nodeAddr)
      // 检测节点通信是否正常
      pingMessage := Message{
         From: node.Id,
         Type: Ping,
      }
      response, _ := node.CommunicateWithSibling(rpcClient, pingMessage)

      if response.Type == Pong {
         log.Printf("与节点 %s 通信正常\n", nodeId)
         node.Siblings.addSibling(nodeId, rpcClient)
      } else {
         log.Printf("与节点 %s 通信异常,返回:%+v\n", nodeId, response)
      }
   }
}

// 连接到单个节点
func (node *Node) connectToSibling(nodeAddr string) *rpc.Client {
retry:
   rpcClient, err := rpc.Dial("tcp", nodeAddr)

   if err != nil {
      log.Printf("节点 %s 连接到节点 %s 失败: %v,即将进行重试……\n", node.Id, nodeAddr, err)
      time.Sleep(1 * time.Second)
      goto retry
   }

   return rpcClient
}

// 与节点进行通信
func (node *Node) CommunicateWithSibling(rpcClient *rpc.Client, message Message) (Message, error) {
   var response Message

   err := rpcClient.Call("Node.RespondTheMessage", message, &response)

   if err != nil {
      log.Printf("与节点通信失败:%s\n", err)
   }

   return response, err
}

// 响应消息
func (node *Node) RespondTheMessage(message Message, response *Message) error {
   response.From = node.Id

   switch message.Type {
   case Ping:
      response.Type = Pong
      // 如果之前的 leader 节点重启之后重新与系统中其他节点连接,那么当前的节点也需要与这个重启的节点建立连接
      if node.Siblings.getSibling(message.From) == nil {
         log.Printf("节点 %s 重新与节点 %s 建立连接\n", node.Id, message.From)
         rpcClient := node.connectToSibling(nodeMap[message.From])
         node.Siblings.addSibling(message.From, rpcClient)
      }
   case Election:
      response.Type = Alive
   case Elected:
      node.LeaderId = message.From
      log.Printf("leader 节点选举完成,新的 leader 节点为 %s\n", node.LeaderId)
      // 对新的 leader 进行心跳检测
      go node.heartBeat()
      response.Type = Alive
   }

   return nil
}

// leader 节点心跳检测
func (node *Node) heartBeat() {
ping:
   leader := node.Siblings.getSibling(node.LeaderId)

   if leader == nil {
      log.Printf("获取当前 leader 节点失败,nodeId: %s, leaderId: %s, siblings: %s\n",
         node.Id, node.LeaderId, node.Siblings.ListSiblings())
      return
   }

   message := Message{
      From: node.Id,
      Type: Ping,
   }

   response, err := node.CommunicateWithSibling(leader.RpcClient, message)

   if err != nil {
      log.Printf("当前 leader 节点失效,开始选举新的 leader 节点\n")
      node.Siblings.deleteSibling(node.LeaderId)
      node.LeaderId = ""
      // 选举新的 leader 节点
      node.Elect()
      return
   }

   log.Printf("leader 节点心跳检测响应:%v\n", response)
   if response.Type == Pong {
      time.Sleep(5 * time.Second)
      goto ping
   }
}

// 选举 leader 节点
func (node *Node) Elect() {
   isHighestNode := true

   siblings := node.Siblings.sliceSiblings()

   for _, sibling := range siblings {
      if strings.Compare(node.Id, sibling.Id) > 0 {
         continue
      }

      log.Printf("%s 发送 election 消息到 %s\n", node.Id, sibling.Id)
      message := Message{
         From: node.Id,
         Type: Election,
      }

      response, _ := node.CommunicateWithSibling(sibling.RpcClient, message)

      if response.Type == Alive {
         isHighestNode = false
      }
   }

   if isHighestNode {
      node.LeaderId = node.Id
      electedMessage := Message{
         From: node.Id,
         Type: Elected,
      }
      // 广播消息
      node.broadCast(electedMessage)
   }
}

// 广播消息
func (node *Node) broadCast(message Message) {
   siblings := node.Siblings.sliceSiblings()

   for _, sibling := range siblings {
      log.Printf("节点 %s 广播 elected 消息到 %s\n", node.Id, sibling.Id)
      response, err := node.CommunicateWithSibling(sibling.RpcClient, message)
      if err != nil {
         log.Printf("广播 elected 消息到 %s 失败:%s\n", sibling.Id, err)
      } else {
         log.Printf("广播 elected 消息到 %s 得到响应:%v\n", sibling.Id, response)
      }
   }
}

  程序在启动之初,首先监听当前节点的端口,然后与系统中其他节点建立连接,之后进行初始的 leader 节点选举。

// main.go
package main

import (
   "log"
   "net/rpc"
   "os"
   "os/signal"
   "time"
)

func main() {
   if len(os.Args) != 2 {
      log.Fatal("传参错误!")
   }

   nodeId := os.Args[1]
   node := NewNode(nodeId)

   listener, err := node.NewListener()
   if err != nil {
      log.Fatal("监听当前节点失败:", err.Error())
   }
   defer listener.Close()

   rpcServer := rpc.NewServer()
   rpcServer.Register(node)

   go rpcServer.Accept(listener)

   node.ConnectToSiblings()

   log.Printf("节点 %s 连接到 %s\n", nodeId, node.Siblings.ListSiblings())
   log.Println("等待系统启动……")

   time.Sleep(10 * time.Second)

   node.Elect()

   ch := make(chan os.Signal, 1)
   signal.Notify(ch, os.Interrupt)
   <- ch
}

为测试方便,程序只是通过监听不同端口来模拟分布式系统的多个节点。另外,程序也省去了分布式系统中应有的服务注册以及发现机制,各个节点信息都被硬编码到了代码中。

⒋ Bully 算法的限制与不足

  Bully 算法要求系统中各节点之间进行大量的消息交互,这些消息交互的开销会随着系统规模的增长而增加,并可能最终成为系统的瓶颈。

  网络故障会影响系统节点之间的通信,部分节点会由于网络故障得不到当前 leader 节点的响应而发起新的 leader 节点选举,最终导致一个系统中同时存在多个 leader 节点。

  Bully 算法通过节点的优先级确定 leader 节点,这导致部分低优先级的节点永远没有机会被选举为 leader 节点。另外,如果某些高优先级的节点频繁出现宕机/重启,相应的 leader 选举也会被频繁触发。