使用GO实现Paxos共识算法的方法

算法
这篇文章主要介绍了使用GO实现Paxos共识算法,本文给大家介绍的非常详细,对大家的学习或工作,具有一定的参考借鉴价值,需要的朋友可以参考下

什么是Paxos共识算法

最初的服务往往都是通过单体架构对外提供的,即单Server-单Database模式。随着业务的不断扩展,用户和请求数都在不断上升,如何应对大量的请求就成了每个服务都需要解决的问题,这也就是我们常说的高并发。为了解决单台服务器面对高并发的苍白无力,可以通过增加服务器数量来解决,即多Server-单Database(Master-Slave)模式,此时的压力就来到了数据库一方,数据库的IO效率决定了整个服务的效率,继续增加Server数量将无法提升服务性能。这就衍生出了当前火热的微服务架构。当用户请求经由负载均衡分配到某一服务实例上后,如何保证该服务的其他实例最终能够得到相同的数据变化呢?这就要用到Paxos分布式共识协议,Paxos解决的就是共识问题,也就是一段时间后,无论get哪一个服务实例,都能获取到相同的数据。目前国内外的分布式产品很多都使用了Paxos协议,可以说Paxos几乎就是共识协议的标准和代名词。

Paxos有两种协议,我们常常提到的其实是Basic Paxos,另一种叫Multi Paxos,如无特殊说明,本文中提到的Paxos协议均为Basic Paxos。

Paxos协议是由图灵奖获得者Leslie Lamport于1998年在其论文《The Part-Time Parliament》中首次提出的,讲述了一个希腊小岛Paxos是如何通过决议的。但由于该论文晦涩艰深,当时的计算机界大牛们也没几个人能理解。于是Lamport2001年再次发表了《Paxos Made Simple》,摘要部分是这么写的:

The Paxos algorithm, when presented in plain English, is very simple.

翻译过来就是:不会吧,不会吧,这么简单的Paxos算法不会真的有人弄不懂吧?然而事实却是很多人对Paxos都望而却步,理解Paxos其实并不难,但是Paxos的难点在于工程化,如何利用Paxos协议写出一个能过够真正在生产环境中跑起来的服务才是Paxos最难的地方,关于Paxos的工程化可以参考微信后台团队撰写的《微信自研生产级paxos类库PhxPaxos实现原理介绍》

Paxos如何保证一致性的

Paxos协议一共有两个阶段:Prepare和Propose,两种角色:Proposer和Acceptor,每一个服务实例既是Proposer,同时也是Acceptor,Proposer负责提议,Acceptor决定是否接收来自Proposer的提议,一旦提议被多数接受,那么我们就可以宣称对该提议包含的值达成了一致,而且不会再改变。

阶段一:Prepare 准备

  • Proposer生成全局唯一ProposalID(时间戳+ServerID)
  • Proposer向所有Acceptor(包括Proposer自己)发送Prepare(n = ProposalID)请求
  • Acceptor比较n和minProposal, if n > minProposal, minProposal = n,Acceptor返回已接受的提议(acceptedProposal, acceptedValue)
  • 承诺1:不再接受n <= minProposal的Prepare请求
  • 承诺2:不再接受n < minProposal的Propose请求
  • 应答1:返回此前已接受的提议
  • 当Proposer收到大于半数的返回后
  • Prepare请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Prepare请求被接受且有已接受的提议,选择最大的ProposalID对应的值作为提议的值
  • Prepare请求被接受且没有已接受的提议,可选择任意提议值

    阶段二:Propose 提议

  • Proposer向所有Acceptor(包括Proposer自己)发送Accept(n=ProposalID,value=ProposalValue)请求
  • Acceptor比较n和minProposal, if n >= minProposal, minProposal = n, acceptedValue = value,返回已接受的提议(minProposal,acceptedValue)
  • 当Proposer收到大于半数的返回后
  • Propose请求被拒绝,重新生成ProposalID并发送Prepare请求
  • Propose请求被接受,则数据达成一致性

一旦提议被半数以上的服务接受,那么我们就可以宣称整个服务集群在这一提议上达成了一致。

需要注意的是,在一个服务集群中以上两个阶段是很有可能同时发生的。 例如:实例A已完成Prepare阶段,并发送了Propose请求。同时实例B开始了Prepare阶段,并生成了更大的ProposalID发送Prepare请求,可能导致实例A的Propose请求被拒绝。 每个服务实例也是同时在扮演Proposer和Acceptor角色,向其他服务发送请求的同时,可能也在处理别的服务发来的请求。

使用GO语言实现Paxos协议

服务注册与发现

由于每个服务实例都是在执行相同的代码,那我们要如何知晓其他服务实例的入口呢(IP和端口号)?方法之一就是写死在代码中,或者提供一份配置文件。服务启动后可以读取该配置文件。但是这种方法不利于维护,一旦我们需要移除或添加服务则需要在每个机器上重新休息配置文件。

除此之外,我们可以通过一个第三方服务:服务的注册与发现来注册并获知当前集群的总服务实例数,即将本地的配置文件改为线上的配置服务。

服务注册:Register函数,服务实例启动后通过调用这个RPC方法将自己注册在服务管理中

func (s *Service) Register(args *RegisterArgs, reply *RegisterReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 
 server := args.ServerInfo
 for _, server := range s.Servers {
  if server.IPAddress == args.ServerInfo.IPAddress && server.Port == args.ServerInfo.Port {
   reply.Succeed = false
   return nil
  }
 }
 reply.ServerID = len(s.Servers)
 reply.Succeed = true
 s.Servers = append(s.Servers, server)
 
 fmt.Printf("Current registerd servers:\n%v\n", s.Servers)
 
 return nil
}

服务发现:GetServers函数,服务通过调用该RPC方法获取所有服务实例的信息(IP和端口号)

func (s *Service) GetServers(args *GetServersArgs, reply *GetServersReply) error {
 // return all servers
 reply.ServerInfos = s.Servers
 
 return nil
}

Prepare阶段

Proposer,向所有的服务发送Prepare请求,并等待直到半数以上的服务返回结果,这里也可以等待所有服务返回后再处理,但是Paxos协议可以容忍小于半数的服务宕机,因此我们只等待大于N/2个返回即可。当返回的结果有任何一个请求被拒绝,那Proposer即认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求。

func (s *Server) CallPrepare(allServers []ServerInfo, proposal Proposal) PrepareReply {
 returnedReplies := make([]PrepareReply, 0)
 for _, otherS := range allServers {
  // use a go routine to call every server
  go func(otherS ServerInfo) {
   delay := rand.Intn(10)
   time.Sleep(time.Second * time.Duration(delay))
   args := PrepareArgs{s.Info, proposal.ID}
   reply := PrepareReply{}
   fmt.Printf("【Prepare】Call Prepare on %v:%v with proposal id %v\n", otherS.IPAddress, otherS.Port, args.ProposalID)
   if Call(otherS, "Server.Prepare", &args, &reply) {
    if reply.HasAcceptedProposal {
     fmt.Printf("【Prepare】%v:%v returns accepted proposal: %v\n", otherS.IPAddress, otherS.Port, reply.AcceptedProposal)
    } else {
     fmt.Printf("【Prepare】%v:%v returns empty proposal\n", otherS.IPAddress, otherS.Port)
    }
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   // three possible response
   // 1. deny the prepare, and return an empty/accepted proposal
   // as the proposal id is not higher than minProposalID on server (proposal id <= server.minProposalID)
   // 2. accept the prepare, and return an empty proposal as the server has not accept any proposal yet
   // 3. accept the prepare, and return an accepted proposal
   // check responses from majority
   // find the response with max proposal id
   acceptedProposal := NewProposal()
   for _, r := range checkReplies {
    // if any response refused the prepare, this server should resend prepare
    if !r.PrepareAccepted {
     return r
    }
    if r.HasAcceptedProposal && r.AcceptedProposal.ID > acceptedProposal.ID {
     acceptedProposal = r.AcceptedProposal
    }
   }
   // if some other server has accepted proposal, return that proposal with max proposal id
   // if no other server has accepted proposal, return an empty proposal
   return PrepareReply{HasAcceptedProposal: !acceptedProposal.IsEmpty(), AcceptedProposal: acceptedProposal, PrepareAccepted: true}
  }
  //fmt.Printf("Waiting for response from majority...\n")
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于等于minProposal,则拒绝该Prepare请求,否则更新minProposal为ProposalID。最后返回已接受的提议

func (s *Server) Prepare(args *PrepareArgs, reply *PrepareReply) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 // 2 promises and 1 response
 // Promise 1
 // do not accept prepare request which ProposalID <= minProposalID
 // Promise 2
 // do not accept propose request which ProposalID < minProposalID
 // Response 1
 // respond with accepted proposal if any
 if reply.PrepareAccepted = args.ProposalID > s.minProposalID; reply.PrepareAccepted {
  // ready to accept the proposal with Id s.minProposalID
  s.minProposalID = args.ProposalID
 }
 reply.HasAcceptedProposal = s.readAcceptedProposal()
 reply.AcceptedProposal = s.Proposal
 return nil
}

Propose阶段

Proposer,同样首先向所有的服务发送Propose请求,并等待知道半数以上的服务返回结果。如果返回的结果有任何一个请求被拒绝,则Proposer认为这次的请求被拒绝,返回重新生成ProposalID并发送新一轮的Prepare请求

func (s *Server) CallPropose(allServers []ServerInfo, proposal Proposal) ProposeReply {
 returnedReplies := make([]ProposeReply, 0)
 for _, otherS := range allServers {
  go func(otherS ServerInfo) {
   delay := rand.Intn(5000)
   time.Sleep(time.Millisecond * time.Duration(delay))
   args := ProposeArgs{otherS, proposal}
   reply := ProposeReply{}
   fmt.Printf("【Propose】Call Propose on %v:%v with proposal: %v\n", otherS.IPAddress, otherS.Port, args.Proposal)
   if Call(otherS, "Server.Propose", &args, &reply) {
    fmt.Printf("【Propose】%v:%v returns: %v\n", otherS.IPAddress, otherS.Port, reply)
    s.mu.Lock()
    returnedReplies = append(returnedReplies, reply)
    s.mu.Unlock()
   }
  }(otherS)
 }
 for {
  // wait for responses from majority
  if len(returnedReplies) > (len(allServers))/2.0 {
   checkReplies := returnedReplies
   for _, r := range checkReplies {
    if !r.ProposeAccepted {
     return r
    }
   }
   return checkReplies[0]
  }
  time.Sleep(time.Second * 1)
 }
}

Acceptor,通过比较ProposalID和minProposal,如果ProposalID小于minProposal,则拒绝该Propose请求,否则更新minProposal为ProposalID,并将提议持久化到本地磁盘中。

func (s *Server) Propose(args *ProposeArgs, reply *ProposeReply) error {
 if s.minProposalID <= args.Proposal.ID {
  s.mu.Lock()
  s.minProposalID = args.Proposal.ID
  s.Proposal = args.Proposal
  s.SaveAcceptedProposal()
  s.mu.Unlock()
 
  reply.ProposeAccepted = true
 }
 
 reply.ProposalID = s.minProposalID
 
 return nil
}

运行

运行结果:

这里我一共开启了3个服务实例,并在每次请求之前加入了随机的延迟,模拟网络通信中的延迟,因此每个服务的每个请求并不是同时发出的

动图一张:

静态结果一张:

可以看到3个服务尽管一开始会尝试以他们自己的端口号(5001,5002,5003)作为提议值,在Prepare/Propose失败后,都会重新生成更大的ProposalID并开启新一轮的提议过程(Prepare,Propose),且最后都以5003达成一致。

小结

至此,我们就用GO实现了Paxos协议的核心逻辑。但显而易见的是,这段代码仍然存在很多问题,完全无法满足生产环境的需求

  • 通过channel而不是mutex锁来共享数据
  • 如何处理服务实例的移除和增加
  • 如何避免陷入活锁