etcd的MVCC是怎么实现的
MVCC是什么
在了解之前,首先需要明白乐观锁与悲观锁的概念。乐观锁与悲观锁是两种编程思想,并不局限与编程语言。
悲观锁
在对临界资源做一些读写时候,为了防止其他人同步修改数据,直接把数据锁住,操作完成后才会释放锁,通过这种方式实现并发安全。常见的有Go的Mutex,java的synchronized等。 ###乐观锁 在对临界资源做操作时候,不锁住数据实现独占,而是判断数据有没有被他人修改过,如果修改了则返回修改失败。校验是否修改常见的方式有多版本并发控制(MVCC)等。
MVCC简介
MVCC即在对数据做修改操作时候,并不对原数据做修改,而是在数据基础上追加一个修改后的数据,并通过一个唯一的版本号做区分,版本号一般通过自增的方式;在读取数据时候,读到的实际是当前版本号对应的一份快照数据。 比如一个键值对数据K->{V.0},此时value的版本号为0。 操作1首先对数据做修改,读取到的0版本号的数据,对其做修改提交事务后便成为K-> {V.0,V.1}, 操作2之后读到的数据是版本号1的数据,对其做修改后提交事务成功变为K->{V.0, V.1, V.2}。 每次修改只是往后面进行版本号以及数据值追加,通过这种方式使得每个事务操作到的是自己版本号内的数据,实现事务之间的隔离。也可以通过指定版本号访问对应的数据。
etcd的实现
etcd就是基于MVCC机制实现的一个键值对数据库。接下来通过一个示例演示一下。
etcd版本号
首先通过一个简单的put,get例子认识一下etcd的版本号。
#首先put一个key为linugo,value为go的数据 [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo go OK #获取数据,可以看到k与v都是用了base64加密,可以看到3个version [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool { "count": 1, "header": { "cluster_id": 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 2 }, "kvs": [ { "create_revision": 2, #创建key时候对应的版本号 "key": "bGludWdv", "mod_revision": 2, #修改时候的版本号 "value": "Z28=", "version": 1 #修改次数(包含创建次数) } ] } #再次put两次 [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo gol OK [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo gola OK [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo -w=json|python -m json.tool { "count": 1, "header": { "cluster_id": 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 4 }, "kvs": [ { "create_revision": 2,#创建的版本号依然是2 "key": "bGludWdv", "mod_revision": 4,#修改时候的版本号变为了4 "value": "Z29sYQ==", "version": 3 #修改次数为3 } ] }复制代码
可以看到创建时候对应了一个版本号,每次修改后会生成新的版本号,是不是类似于上面所说版本号叠加呢。 接下来put一个与上面不同的键值对。
#接下来put一个不同的key [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 put linugo1 go OK #获取详细信息 [XXXX etcdctl]$ ./etcdctl --endpoints=127.0.0.1:23790 get linugo1 -w=json|python -m json.tool { "count": 1, "header": { "cluster_id": 14841639068965178418, "member_id": 10276657743932975437, "raft_term": 2, "revision": 5 }, "kvs": [ { "create_revision": 5,#创建的版本号成为了5 "key": "bGludWdvMQ==", "mod_revision": 5,#修改时候版本号成为了5 "value": "Z28=", "version": 1 #修改次数为1 } ] }复制代码
此时创建时候的版本号变成了5,这是由于etcd里面存在一个全局的总版本号revision,充当了逻辑时钟的概念,对每个key做一些修改删除操作都会触发主版本号自增。而每个key所作的就是在创建或者修改时候,记录下该value对应的主版本号。 理论上可以通过主版本号来找到任意数据的修改历史。如果双双记录的话,可以通过key查询到它对应的所有版本号,然后可以通过最新版本号找到对应的value。实际上,etcd也是这样做的。
MVCC总览
etcd维护了上面提到的两种映射关系,在内存中维护了一个B-Tree作为key与对应版本号的映射关系,这个结构叫做treeIndex,又使用了BoltDB提供了版本号与对应value的映射关系以及数据的持久化存储。
当查询一个数据时候,首先通过treeIndex定位到最新的版本号revision(如果客户端有指定版本号则查询指定的revision),再通过revision定位到对应的value。这个逻辑有点类似于MySQL中的普通索引查询。
keyIndex
在treeIndex中,key是通过keyIndex结构对应多个revision的。
type keyIndex struct { key []byte modified revision // the main rev of the last modification generations []generation }复制代码
key:对应用户put的key
modified:最后一次修改对应的revision
generations:所有代对应的revisions
type revision struct { main int64 sub int64 }复制代码
revision也并不是一个单纯的数值类型,由两个字段组成
main:主版本号,也就对应etcd中的主版本号
sub:子事务版本号,在一个事务中可能包含多个子事务(比如一个Txn请求中包含多个put操作,主版本号main不会变,每个put操作会自增生成不一样的sub)
type generation struct { ver int64 created revision // when the generation is created (put in first revision). revs []revision }复制代码
在etcd中,对数据做删除操作并没有对数据做删除,而是在generations数组后追加了一个新的generation元素,但是如果想通过版本号获取已经del的数据时也是获取不到的。在获取key最新revision时候,只需要找到generations数组最后一个generation,并找到其中revs的最后一个revision元素。
ver:本generation中的key的修改次数,对应上面示例中key的修改次数
created:创建时候对应的全局版本号revision
revs:key各个版本对应的版本号
Example
下面通过一个示例来探究一下三者的转化关系。
首先put一个新的key,查看它的keyIndex,初始版本号为112,如下图左;
对该key进行两次put操作,查看keyindex,版本号自增2,ver修改次数变为3,revs中增加了两个元素,中间图;
删除该key,查看keyindex,版本号自增,generation切片增加了一位,原revs也增加了一位,对应删除操作。如下图右。
KeyValue
接下来看下boltDB中该key对应的数据并不是一个单纯的value,同样也包含了很多其他的字段
type KeyValue struct { Key []byte //对应用户传入的key CreateRevision int64 //创建时候的版本号 ModRevision int64 //最后一次修改的版本号 Version int64 //该key修改次数 Value []byte //传入的value值 Lease int64 //租约ID }复制代码
由于数据是通过boltDB做的磁盘持久化,所以在每次查询或者修改时候直接使用boltDB走磁盘读写会导致一定的性能问题,所以在访问boltDB之前有一个缓存buffer,buffer有两块,一块读buffer(txReadBuffer),位于baseReadTx,用于读事务;一块写buffer(txWriteBuffer),位于batchTxBuffered,用于写操作以及刷盘等。
type txWriteBuffer struct { txBuffer bucket2seq map[BucketID]bool } type txReadBuffer struct { txBuffer bufVersion uint64 }复制代码
读取时候首先从buffer中读取,没有命中则从boltdb中读取; 写的时候直接写到buffer,在End事务提交时候,与读buffer进行merge操作,后台协程会定时对写buffer落盘。
读数据源码调用关系
读请求在etcd中统一对应了Range方法,请求在上层经过了封装,拦截器校验,raft数据同步等流程。
//etcdserver/apply.go func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { ...... if txn == nil { //初始化一个读事务,对部分共享区域加读锁,获取当前最新版本号, txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace) //事务结束,提交事务,对加锁的部分解锁 defer txn.End() } ...... //调用读事务实现的Range方法,通过key获取对应的value(因为支持范围查找,所以返回的value结构是切片) rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro) if err != nil { return nil, err } //封装结果,返回客户端 resp.Header.Revision = rr.Rev resp.Count = int64(rr.Count) resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) for i := range rr.KVs { if r.KeysOnly { rr.KVs[i].Value = nil } resp.Kvs[i] = &rr.KVs[i] } return resp, nil }复制代码
接下来看Range方法的具体实现
//mvcc/metrics_txn.go func (tw *metricsTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (*RangeResult, error) { tw.ranges++ //用于请求指标的统计 return tw.TxnWrite.Range(ctx, key, end, ro) } //mvcc/kvstore_txn.go func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) { return tr.rangeKeys(ctx, key, end, tr.Rev(), ro) } func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) { //检验版本号是否正常的操作 ...... //获取treeindex中指定key的符合条件的版本号信息,由于可能是范围查找,版本号信息revpairs也属于切片类型 revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit)) ...... kvs := make([]mvccpb.KeyValue, limit) revBytes := newRevBytes() //对查出来的饿 for i, revpair := range revpairs[:len(kvs)] { //校验是否超时的操作 ...... //通过版本号查找对应value的操作 revToBytes(revpair, revBytes) _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0) if len(vs) != 1 { ...... } //将查找到的value进行反序列化 if err := kvs[i].Unmarshal(vs[0]); err != nil { ...... } } return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil }复制代码
查找revision
//mvcc/index.go func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) { //对应查询单个key if end == nil { //查找treeindex中key对应的keyIndex rev, _, _, err := ti.Get(key, atRev) ... return []revision{rev}, 1 } //对应范围查找 ti.visit(key, end, func(ki *keyIndex) bool { ...... }) return revs, total } func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) { keyi := &keyIndex{key: key} //查询出来的keyindex ti.RLock() defer ti.RUnlock() //查询出来key对应的keyIndex结构 if keyi = ti.keyIndex(keyi); keyi == nil { return revision{}, revision{}, 0, ErrRevisionNotFound } //从keyindex结构中提取出来需要的信息,即返回的信息 return keyi.get(ti.lg, atRev) } func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) { //找到有效的generation,atRev是指当前的revision或者客户端传入的指定revision g := ki.findGeneration(atRev) //walk找到有效的revision,即比atRev版本号靠前而且距离atRev最近的一个版本号 n := g.walk(func(rev revision) bool { return rev.main > atRev }) if n != -1 { //返回最终修改时的revision(即数组revs中最后的元素),创建时的revision,修改次数等 return g.revs[n], g.created, g.ver - int64(len(g.revs)-n-1), nil } return revision{}, revision{}, 0, ErrRevisionNotFound }复制代码
查value
通过上面的Revisions方法获取到了Revision,接下来要通过UnsafeRange查找到revision对应的value。该方法首先从读缓存中查找,查找不到去boltdb中查找。
//mvcc/backend/read_tx.go func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) { //对获取数量limit的检验 .... //首先从缓存readbuffer中读取,如果获取到直接返回。 keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit) //从buffer中读取,看看能读到不 if int64(len(keys)) == limit { return keys, vals } ...... //去boltDB中获取 k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) return append(k2, keys...), append(v2, vals...) }复制代码
写数据源码调用关系
写数据统一对应put方法,也会经过一系列的鉴权等前置步骤。
//etcdserver/apply.go func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { ...... if txn == nil { ...... //初始化一个写事务,对一些变量加锁等操作 txn = a.s.KV().Write(trace) //写事务提交 defer txn.End() } ...... //调用写事务的put方法,返回是否put后的版本号 resp.Header.Revision = txn.Put(p.Key, val, leaseID) return resp, trace, nil } //mvcc/metrics_txn.go func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { //用于指标统计的一些数据 ...... //调用put方法 return tw.TxnWrite.Put(key, value, lease) } //mvcc/kvstore_txn.go func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { //put操作 tw.put(key, value, lease) return tw.beginRev + 1 }复制代码
put会生成新的版本号以及同步缓存的操作。
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { rev := tw.beginRev + 1 c := rev oldLease := lease.NoLease //首先查看该key是否存在,如果存在,则返回创建时的版本号(用于封装存储的value),修改次数等信息 _, created, ver, err := tw.s.kvindex.Get(key, rev) if err == nil { c = created.main ...... } ibytes := newRevBytes() //生成一个revision idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) //修改次数加一 ver = ver + 1 //封装成要持久化存储的value kv := mvccpb.KeyValue{ Key: key, Value: value, CreateRevision: c, ModRevision: rev, Version: ver, Lease: int64(leaseID), } //序列化value d, err := kv.Marshal() //将数据(revision:value)存入buffer和boltDB中,此时并未持久化 tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) //在treeIndex中加入(key:revision)的对应关系,就是在对应keyIndex的generations中revs后面追加一个revsion tw.s.kvindex.Put(key, idxRev) ...... } //mvcc/backend/batch_tx.go func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { //在boltdb中加入该数据,但并未提交 t.batchTx.UnsafeSeqPut(bucket, key, value) //在缓存buffer中加入该数据 t.buf.putSeq(bucket, key, value) } func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) { t.unsafePut(bucket, key, value, true) } func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) { if err := bucket.Put(key, value); err != nil { ...... } //pending标志位自增,用于后面的合并缓存以及持久化数据 t.pending++ }复制代码
缓存合并
在请求结束时候,会调用txn对应的End()方法提交事务,
//mvcc/kvstore_txn.go unc (tw *storeTxnWrite) End() { ...... //调用写事务的Unlock tw.tx.Unlock() } //mvcc/backend/batch_tx.go func (t *batchTxBuffered) Unlock() { //pending不等于0说明有写操作 if t.pending != 0 { //读buffer加锁,在此期间读请求被阻塞 t.backend.readTx.Lock() //合并两个缓存 t.buf.writeback(&t.backend.readTx.buf) t.backend.readTx.Unlock() //如果超过限制则直接进行持久化操作 if t.pending >= t.backend.batchLimit { t.commit(false) } } t.batchTx.Unlock() } //mvcc/backend/tx_buffer.go func (txw *txWriteBuffer) writeback(txr *txReadBuffer) { //遍历写buffer并对读buffer进行merge for k, wb := range txw.buckets { rb, ok := txr.buckets[k] if !ok { delete(txw.buckets, k) txr.buckets[k] = wb continue } ...... rb.merge(wb) } ...... }复制代码
数据持久化
上面在调用put的时候将数据存入的buffer与boltDB中,但是并未进行持久化操作。真早的持久化操作是在一个后台backend协程中执行的,这个后台协程会在etcd启动时开始工作,主要负责退出时候的资源持久化以及定期进行数据的磁盘持久化。
//mvcc/backend/backend.go func (b *backend) run() { defer close(b.donec) t := time.NewTimer(b.batchInterval) defer t.Stop() for { select { case <-t.C: case <-b.stopc://收到退出信号,则进行磁盘持久化后退出协程 b.batchTx.CommitAndStop() return } //获取pending数据,如果不为0,则进行commit if b.batchTx.safePending() != 0 { b.batchTx.Commit() } t.Reset(b.batchInterval) } } //mvcc/backend/batch_tx.go func (t *batchTxBuffered) Commit() { t.Lock() t.commit(false) t.Unlock() }复制代码
数据删除
数据删除时候,数据并没有真正的删除掉,只是在keyIndex中的generations数组中增加了一个新的generation元素。删除操作对应到etcd中的DeleteRange方法,删除满足条件的数据。
//mvcc/kvstore_txn.go func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { rrev := tw.beginRev if len(tw.changes) > 0 { rrev++ } //寻找符合条件的key keys, _ := tw.s.kvindex.Range(key, end, rrev) if len(keys) == 0 { return 0 } for _, key := range keys { //删除操作 tw.delete(key) } return int64(len(keys)) } func (tw *storeTxnWrite) delete(key []byte) { ibytes := newRevBytes() //生成一个revision idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) //打一个标记 ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes) kv := mvccpb.KeyValue{Key: key} d, err := kv.Marshal() ...... //将数据删除的记录写到boltDB以及buffer tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d) //调用Tombstone err = tw.s.kvindex.Tombstone(key, idxRev) ...... } func (ti *treeIndex) Tombstone(key []byte, rev revision) error { keyi := &keyIndex{key: key} ti.Lock() defer ti.Unlock() //获取keyindex item := ti.tree.Get(keyi) if item == nil { return ErrRevisionNotFound } ki := item.(*keyIndex) return ki.tombstone(ti.lg, rev.main, rev.sub) } func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error { ...... //将删除操作的对应版本号写入到keyindex ki.put(lg, main, sub) //追加空的generation ki.generations = append(ki.generations, generation{}) keysGauge.Dec() return nil }复制代码
小结
本文开始分析了etcd实现MVCC的原理,之后从源码角度追溯mvcc的具体实现,仅对大体流程以及总体思路的源码进行了追溯,对于一些具体的实现并没有深入探究下去(如boltDB存储,buffer存储,etcd事务等),一些想要了解更深入的同学还需要自己更细化的读一下代码。了解etcd MVCC最重要的是弄懂keyIndex,revision,generation这三个数据结构。
作者:LinuGo_guozhao
链接:https://juejin.cn/post/7062282193988485128