23
2020
11

Etcd怎么实现分布式锁?

首先思考下Etcd是什么?可能很多人第一反应可能是一个键值存储仓库,却没有重视官方定义的后半句,用于配置共享和服务发现。


A highly-available key value store for shared configuration and service discovery.

实际上,etcd 作为一个受到 ZooKeeper 与 doozer 启发而催生的项目,除了拥有与之类似的功能外,更专注于以下四点。


简单:基于 HTTP+JSON 的 API 让你用 curl 就可以轻松使用。

安全:可选 SSL 客户认证机制。

快速:每个实例每秒支持一千次写操作。

可信:使用 Raft 算法充分实现了分布式。

但是这里我们主要讲述Etcd如何实现分布式锁?


因为 Etcd 使用 Raft 算法保持了数据的强一致性,某次操作存储到集群中的值必然是全局一致的,所以很容易实现分布式锁。锁服务有两种使用方式,一是保持独占,二是控制时序。


保持独占即所有获取锁的用户最终只有一个可以得到。etcd 为此提供了一套实现分布式锁原子操作 CAS(CompareAndSwap)的 API。通过设置prevExist值,可以保证在多个节点同时去创建某个目录时,只有一个成功。而创建成功的用户就可以认为是获得了锁。


控制时序,即所有想要获得锁的用户都会被安排执行,但是获得锁的顺序也是全局唯一的,同时决定了执行顺序。etcd 为此也提供了一套 API(自动创建有序键),对一个目录建值时指定为POST动作,这样 etcd 会自动在目录下生成一个当前最大的值为键,存储这个新的值(客户端编号)。同时还可以使用 API 按顺序列出所有当前目录下的键值。此时这些键的值就是客户端的时序,而这些键中存储的值可以是代表客户端的编号。


在这里Ectd实现分布式锁基本实现原理为:


在ectd系统里创建一个key

如果创建失败,key存在,则监听该key的变化事件,直到该key被删除,回到1

如果创建成功,则认为我获得了锁

应用示例:


package etcdsync


import (

 "fmt"

 "io"

 "os"

 "sync"

 "time"


 "github.com/coreos/etcd/client"

 "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"

)


const (

 defaultTTL = 60

 defaultTry = 3

 deleteAction = "delete"

 expireAction = "expire"

)


// A Mutex is a mutual exclusion lock which is distributed across a cluster.

type Mutex struct {

 key    string

 id     string // The identity of the caller

 client client.Client

 kapi   client.KeysAPI

 ctx    context.Context

 ttl    time.Duration

 mutex  *sync.Mutex

 logger io.Writer

}


// New creates a Mutex with the given key which must be the same

// across the cluster nodes.

// machines are the ectd cluster addresses

func New(key string, ttl int, machines []string) *Mutex {

 cfg := client.Config{

  Endpoints:               machines,

  Transport:               client.DefaultTransport,

  HeaderTimeoutPerRequest: time.Second,

 }


 c, err := client.New(cfg)

 if err != nil {

  return nil

 }


 hostname, err := os.Hostname()

 if err != nil {

  return nil

 }


 if len(key) == 0 || len(machines) == 0 {

  return nil

 }


 if key[0] != '/' {

  key = "/" + key

 }


 if ttl < 1 {

  ttl = defaultTTL

 }


 return &Mutex{

  key:    key,

  id:     fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),

  client: c,

  kapi:   client.NewKeysAPI(c),

  ctx: context.TODO(),

  ttl: time.Second * time.Duration(ttl),

  mutex:  new(sync.Mutex),

 }

}


// Lock locks m.

// If the lock is already in use, the calling goroutine

// blocks until the mutex is available.

func (m *Mutex) Lock() (err error) {

 m.mutex.Lock()

 for try := 1; try <= defaultTry; try++ {

  if m.lock() == nil {

   return nil

  }

  

  m.debug("Lock node %v ERROR %v", m.key, err)

  if try < defaultTry {

   m.debug("Try to lock node %v again", m.key, err)

  }

 }

 return err

}


func (m *Mutex) lock() (err error) {

 m.debug("Trying to create a node : key=%v", m.key)

 setOptions := &client.SetOptions{

  PrevExist:client.PrevNoExist,

  TTL:      m.ttl,

 }

 resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions)

 if err == nil {

  m.debug("Create node %v OK [%q]", m.key, resp)

  return nil

 }

 m.debug("Create node %v failed [%v]", m.key, err)

 e, ok := err.(client.Error)

 if !ok {

  return err

 }


 if e.Code != client.ErrorCodeNodeExist {

  return err

 }


 // Get the already node's value.

 resp, err = m.kapi.Get(m.ctx, m.key, nil)

 if err != nil {

  return err

 }

 m.debug("Get node %v OK", m.key)

 watcherOptions := &client.WatcherOptions{

  AfterIndex : resp.Index,

  Recursive:false,

 }

 watcher := m.kapi.Watcher(m.key, watcherOptions)

 for {

  m.debug("Watching %v ...", m.key)

  resp, err = watcher.Next(m.ctx)

  if err != nil {

   return err

  }


  m.debug("Received an event : %q", resp)

  if resp.Action == deleteAction || resp.Action == expireAction {

   return nil

  }

 }


}


// Unlock unlocks m.

// It is a run-time error if m is not locked on entry to Unlock.

//

// A locked Mutex is not associated with a particular goroutine.

// It is allowed for one goroutine to lock a Mutex and then

// arrange for another goroutine to unlock it.

func (m *Mutex) Unlock() (err error) {

 defer m.mutex.Unlock()

 for i := 1; i <= defaultTry; i++ {

  var resp *client.Response

  resp, err = m.kapi.Delete(m.ctx, m.key, nil)

  if err == nil {

   m.debug("Delete %v OK", m.key)

   return nil

  }

  m.debug("Delete %v falied: %q", m.key, resp)

  e, ok := err.(client.Error)

  if ok && e.Code == client.ErrorCodeKeyNotFound {

   return nil

  }

 }

 return err

}


func (m *Mutex) debug(format string, v ...interface{}) {

 if m.logger != nil {

  m.logger.Write([]byte(m.id))

  m.logger.Write([]byte(" "))

  m.logger.Write([]byte(fmt.Sprintf(format, v...)))

  m.logger.Write([]byte("\n"))

 }

}


func (m *Mutex) SetDebugLogger(w io.Writer) {

 m.logger = w

}

其实类似的实现有很多,但目前都已经过时,使用的都是被官方标记为deprecated的项目。且大部分接口都不如上述代码简单。 使用上,跟Golang官方sync包的Mutex接口非常类似,先New(),然后调用Lock(),使用完后调用Unlock(),就三个接口,就是这么简单。示例代码如下:


package main


import (

 "github.com/zieckey/etcdsync"

 "log"

)


func main() {

 //etcdsync.SetDebug(true)

 log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile)

 m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"})

 if m == nil {

  log.Printf("etcdsync.NewMutex failed")

 }

 err := m.Lock()

 if err != nil {

  log.Printf("etcdsync.Lock failed")

 } else {

  log.Printf("etcdsync.Lock OK")

 }


 log.Printf("Get the lock. Do something here.")


 err = m.Unlock()

 if err != nil {

  log.Printf("etcdsync.Unlock failed")

 } else {

  log.Printf("etcdsync.Unlock OK")

 }

}

« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。