kubernetes中基于etcd实现集中的数据存储,今天来学习下基于etcd如何实现数据读取一致性、更新一致性、事务的具体实现
1. 数据的存储与版本
1.1 数据存储的转换
在k8s中有部分数据的存储是需要经过处理之后才能存储的,比如secret这种加密的数据,既然要存储就至少包含两个操作,加密存储,解密读取,transformer就是为了完成该操作而实现的,其在进行etcd数据存储的时候回对数据进行加密,而在读取的时候,则会进行解密
1.2 资源版本revision
在etcd中进行修改(增删改)操作的时候,都会递增revision,而在k8s中也通过该值来作为k8s资源的ResourceVersion,该机制也是实现watch的关键机制,在操作etcd解码从etcd获取的数据的时候,会通过versioner组件来为资源动态的修改该值
1.3 数据模型的映射
将数据从etcd中读取后,数据本身就是一个字节数组,如何将对应的数据转换成我们真正的运行时对象呢?还记得我们之前的scheme与codec么,在这里我们知道对应的数据编码格式,也知道资源对象的类型,则通过codec、字节数组、目标类型,我们就可以完成对应数据的反射
2. 查询接口一致性
etcd中的数据写入是基于leader单点写入和集群quorum机制实现的,并不是一个强一致性的数据写入,则如果如果我们访问的节点不存在quorum的半数节点内,则可能造成短暂的数据不一致,针对一些强一致的场景,我们可以通过其revision机制来进行数据的读取, 保证我们读取到更新之后的数据
<span class="c1">// 省略非核心代码</span>
<span class="kd">func</span> <span class="p">(</span><span class="nx">s</span> <span class="o">*</span><span class="nx">store</span><span class="p">)</span> <span class="nx">Get</span><span class="p">(</span><span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">resourceVersion</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">out</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Object</span><span class="p">,</span> <span class="nx">ignoreNotFound</span> <span class="kt">bool</span><span class="p">)</span> <span class="kt">error</span> <span class="p">{</span>
<span class="c1">// 获取key</span>
<span class="nx">getResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Get</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getOps</span><span class="o">...</span><span class="p">)</span>
<span class="c1">// 检测当前版本,是否达到最小版本的</span>
<span class="k">if</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">ensureMinimumResourceVersion</span><span class="p">(</span><span class="nx">resourceVersion</span><span class="p">,</span> <span class="nb">uint64</span><span class="p">(</span><span class="nx">getResp</span><span class="p">.</span><span class="nx">Header</span><span class="p">.</span><span class="nx">Revision</span><span class="p">));</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="c1">// 执行数据转换</span>
<span class="nx">data</span><span class="p">,</span> <span class="nx">_</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">transformer</span><span class="p">.</span><span class="nx">TransformFromStorage</span><span class="p">(</span><span class="nx">kv</span><span class="p">.</span><span class="nx">Value</span><span class="p">,</span> <span class="nx">authenticatedDataString</span><span class="p">(</span><span class="nx">key</span><span class="p">))</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">NewInternalError</span><span class="p">(</span><span class="nx">err</span><span class="p">.</span><span class="nx">Error</span><span class="p">())</span>
<span class="p">}</span>
<span class="c1">// 解码数据</span>
<span class="k">return</span> <span class="nx">decode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">,</span> <span class="nx">data</span><span class="p">,</span> <span class="nx">out</span><span class="p">,</span> <span class="nx">kv</span><span class="p">.</span><span class="nx">ModRevision</span><span class="p">)</span>
<span class="p">}</span>
www#gaodaima.com来源gao.dai.ma.com搞@代*码网搞代码
3. 创建接口实现<img class=”md_compiled ” src=”https://cdn.nlark.com/yuque/0/2020/png/97498/1583385558145-f8e8cc59-b8a6-4706-befb-73989c2ebc3f.png” alt=”image.png” title=”” >
创建一个接口数据则会首先进行资源对象的检查,避免重复创建对象,此时会先通过资源对象的version字段来进行初步检查,然后在利用etcd的事务机制来保证资源创建的原子性操作
<span class="c1">// 省略非核心代码</span>
<span class="kd">func</span> <span class="p">(</span><span class="nx">s</span> <span class="o">*</span><span class="nx">store</span><span class="p">)</span> <span class="nx">Create</span><span class="p">(</span><span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">obj</span><span class="p">,</span> <span class="nx">out</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Object</span><span class="p">,</span> <span class="nx">ttl</span> <span class="kt">uint64</span><span class="p">)</span> <span class="kt">error</span> <span class="p">{</span>
<span class="k">if</span> <span class="nx">version</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">.</span><span class="nx">ObjectResourceVersion</span><span class="p">(</span><span class="nx">obj</span><span class="p">);</span> <span class="nx">err</span> <span class="o">==</span> <span class="kc">nil</span> <span class="o">&&</span> <span class="nx">version</span> <span class="o">!=</span> <span class="mi">0</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">errors</span><span class="p">.</span><span class="nx">New</span><span class="p">(</span><span class="s">"resourceVersion should not be set on objects to be created"</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">.</span><span class="nx">PrepareObjectForStorage</span><span class="p">(</span><span class="nx">obj</span><span class="p">);</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">fmt</span><span class="p">.</span><span class="nx">Errorf</span><span class="p">(</span><span class="s">"PrepareObjectForStorage failed: %v"</span><span class="p">,</span> <span class="nx">err</span><span class="p">)</span>
<span class="p">}</span>
<span class="c1">// 将数据编码</span>
<span class="nx">data</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Encode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">obj</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="c1">// 转换数据</span>
<span class="nx">newData</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">transformer</span><span class="p">.</span><span class="nx">TransformToStorage</span><span class="p">(</span><span class="nx">data</span><span class="p">,</span> <span class="nx">authenticatedDataString</span><span class="p">(</span><span class="nx">key</span><span class="p">))</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">NewInternalError</span><span class="p">(</span><span class="nx">err</span><span class="p">.</span><span class="nx">Error</span><span class="p">())</span>
<span class="p">}</span>
<span class="nx">startTime</span> <span class="o">:=</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Now</span><span class="p">()</span>
<span class="c1">// 事务操作</span>
<span class="nx">txnResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Txn</span><span class="p">(</span><span class="nx">ctx</span><span class="p">).</span><span class="nx">If</span><span class="p">(</span>
<span class="nx">notFound</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="c1">// 如果之前不存在 这里是利用的etcd的ModRevision即修改版本为0, 寓意着对应的key不存在</span>
<span class="p">).</span><span class="nx">Then</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">OpPut</span><span class="p">(</span><span class="nx">key</span><span class="p">,</span> <span class="nb">string</span><span class="p">(</span><span class="nx">newData</span><span class="p">),</span> <span class="nx">opts</span><span class="o">...</span><span class="p">),</span> <span class="c1">// put修改数据</span>
<span class="p">).</span><span class="nx">Commit</span><span class="p">()</span>
<span class="nx">metrics</span><span class="p">.</span><span class="nx">RecordEtcdRequestLatency</span><span class="p">(</span><span class="s">"create"</span><span class="p">,</span> <span class="nx">getTypeName</span><span class="p">(</span><span class="nx">obj</span><span class="p">),</span> <span class="nx">startTime</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">txnResp</span><span class="p">.</span><span class="nx">Succeeded</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">NewKeyExistsError</span><span class="p">(</span><span class="nx">key</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">if</span> <span class="nx">out</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="c1">// 获取对应的Revision</span>
<span class="nx">putResp</span> <span class="o">:=</span> <span class="nx">txnResp</span><span class="p">.</span><span class="nx">Responses</span><span class="p">[</span><span class="mi">0</span><span class="p">].</span><span class="nx">GetResponsePut</span><span class="p">()</span>
<span class="k">return</span> <span class="nx">decode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">,</span> <span class="nx">data</span><span class="p">,</span> <span class="nx">out</span><span class="p">,</span> <span class="nx">putResp</span><span class="p">.</span><span class="nx">Header</span><span class="p">.</span><span class="nx">Revision</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">return</span> <span class="kc">nil</span>
<span class="p">}</span>
<span class="kd">func</span> <span class="nx">notFound</span><span class="p">(</span><span class="nx">key</span> <span class="kt">string</span><span class="p">)</span> <span class="nx">clientv3</span><span class="p">.</span><span class="nx">Cmp</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">clientv3</span><span class="p">.</span><span class="nx">Compare</span><span class="p">(</span><span class="nx">clientv3</span><span class="p">.</span><span class="nx">ModRevision</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="s">"="</span><span class="p">,</span> <span class="mi">0</span><span class="p">)</span>
<span class="p">}</span>
4. 删除接口的实现
删除接口主要是通过CAS和事务机制来共同实现,确保在etcd不发生异常的情况,即使并发对同个资源来进行删除操作也能保证至少有一个节点成功
<span class="c1">// 省略非核心代码</span>
<span class="kd">func</span> <span class="p">(</span><span class="nx">s</span> <span class="o">*</span><span class="nx">store</span><span class="p">)</span> <span class="nx">conditionalDelete</span><span class="p">(</span><span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">out</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Object</span><span class="p">,</span> <span class="nx">v</span> <span class="nx">reflect</span><span class="p">.</span><span class="nx">Value</span><span class="p">,</span> <span class="nx">preconditions</span> <span class="o">*</span><span class="nx">storage</span><span class="p">.</span><span class="nx">Preconditions</span><span class="p">,</span> <span class="nx">validateDeletion</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">ValidateObjectFunc</span><span class="p">)</span> <span class="kt">error</span> <span class="p">{</span>
<span class="nx">startTime</span> <span class="o">:=</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Now</span><span class="p">()</span>
<span class="c1">// 获取当前的key的数据</span>
<span class="nx">getResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Get</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="nx">key</span><span class="p">)</span>
<span class="k">for</span> <span class="p">{</span>
<span class="c1">// 获取当前的状态</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getState</span><span class="p">(</span><span class="nx">getResp</span><span class="p">,</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">v</span><span class="p">,</span> <span class="kc">false</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">txnResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Txn</span><span class="p">(</span><span class="nx">ctx</span><span class="p">).</span><span class="nx">If</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">Compare</span><span class="p">(</span><span class="nx">clientv3</span><span class="p">.</span><span class="nx">ModRevision</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="s">"="</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">rev</span><span class="p">),</span> <span class="c1">// 如果修改版本等于当前状态,就尝试删除</span>
<span class="p">).</span><span class="nx">Then</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">OpDelete</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="c1">// 删除</span>
<span class="p">).</span><span class="nx">Else</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">OpGet</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="c1">// 获取</span>
<span class="p">).</span><span class="nx">Commit</span><span class="p">()</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">txnResp</span><span class="p">.</span><span class="nx">Succeeded</span> <span class="p">{</span>
<span class="c1">// 获取最新的数据重试事务操作</span>
<span class="nx">getResp</span> <span class="p">=</span> <span class="p">(</span><span class="o">*</span><span class="nx">clientv3</span><span class="p">.</span><span class="nx">GetResponse</span><span class="p">)(</span><span class="nx">txnResp</span><span class="p">.</span><span class="nx">Responses</span><span class="p">[</span><span class="mi">0</span><span class="p">].</span><span class="nx">GetResponseRange</span><span class="p">())</span>
<span class="nx">klog</span><span class="p">.</span><span class="nx">V</span><span class="p">(</span><span class="mi">4</span><span class="p">).</span><span class="nx">Infof</span><span class="p">(</span><span class="s">"deletion of %s failed because of a conflict, going to retry"</span><span class="p">,</span> <span class="nx">key</span><span class="p">)</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="c1">// 将最后一个版本的数据解码到out里面,然后返回</span>
<span class="k">return</span> <span class="nx">decode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">data</span><span class="p">,</span> <span class="nx">out</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">rev</span><span class="p">)</span>
<span class="p">}</span>
<span class="p">}</span>
5. 更新接口的实现
更新接口实现上与删除接口并无本质上的差别,但是如果多个节点同时进行更新,CAS并发操作必然会有一个节点成功,当发现已经有节点操作成功,则当前节点其实并不需要再做过多的操作,直接返回即可
<span class="c1">// 省略非核心代码</span>
<span class="kd">func</span> <span class="p">(</span><span class="nx">s</span> <span class="o">*</span><span class="nx">store</span><span class="p">)</span> <span class="nx">GuaranteedUpdate</span><span class="p">(</span>
<span class="nx">ctx</span> <span class="nx">context</span><span class="p">.</span><span class="nx">Context</span><span class="p">,</span> <span class="nx">key</span> <span class="kt">string</span><span class="p">,</span> <span class="nx">out</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Object</span><span class="p">,</span> <span class="nx">ignoreNotFound</span> <span class="kt">bool</span><span class="p">,</span>
<span class="nx">preconditions</span> <span class="o">*</span><span class="nx">storage</span><span class="p">.</span><span class="nx">Preconditions</span><span class="p">,</span> <span class="nx">tryUpdate</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">UpdateFunc</span><span class="p">,</span> <span class="nx">suggestion</span> <span class="o">...</span><span class="nx">runtime</span><span class="p">.</span><span class="nx">Object</span><span class="p">)</span> <span class="kt">error</span> <span class="p">{</span>
<span class="c1">// 获取当前key的最新数据</span>
<span class="nx">getCurrentState</span> <span class="o">:=</span> <span class="kd">func</span><span class="p">()</span> <span class="p">(</span><span class="o">*</span><span class="nx">objState</span><span class="p">,</span> <span class="kt">error</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">startTime</span> <span class="o">:=</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Now</span><span class="p">()</span>
<span class="nx">getResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Get</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getOps</span><span class="o">...</span><span class="p">)</span>
<span class="nx">metrics</span><span class="p">.</span><span class="nx">RecordEtcdRequestLatency</span><span class="p">(</span><span class="s">"get"</span><span class="p">,</span> <span class="nx">getTypeName</span><span class="p">(</span><span class="nx">out</span><span class="p">),</span> <span class="nx">startTime</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="kc">nil</span><span class="p">,</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="k">return</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getState</span><span class="p">(</span><span class="nx">getResp</span><span class="p">,</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">v</span><span class="p">,</span> <span class="nx">ignoreNotFound</span><span class="p">)</span>
<span class="p">}</span>
<span class="c1">// 获取当前数据</span>
<span class="kd">var</span> <span class="nx">origState</span> <span class="o">*</span><span class="nx">objState</span>
<span class="kd">var</span> <span class="nx">mustCheckData</span> <span class="kt">bool</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="nx">suggestion</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span> <span class="o">&&</span> <span class="nx">suggestion</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="c1">// 如果提供了建议的数据,则会使用,</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getStateFromObject</span><span class="p">(</span><span class="nx">suggestion</span><span class="p">[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="c1">//但是需要检测数据</span>
<span class="nx">mustCheckData</span> <span class="p">=</span> <span class="kc">true</span>
<span class="p">}</span> <span class="k">else</span> <span class="p">{</span>
<span class="c1">// 尝试重新获取数据</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">getCurrentState</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="nx">transformContext</span> <span class="o">:=</span> <span class="nx">authenticatedDataString</span><span class="p">(</span><span class="nx">key</span><span class="p">)</span>
<span class="k">for</span> <span class="p">{</span>
<span class="c1">// 检查对象是否已经更新, 主要是通过检测uuid/revision来实现</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">preconditions</span><span class="p">.</span><span class="nx">Check</span><span class="p">(</span><span class="nx">key</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">obj</span><span class="p">);</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="c1">// If our data is already up to date, return the error</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">mustCheckData</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="c1">// 如果检查数据一致性错误,则需要重新获取</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">getCurrentState</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">mustCheckData</span> <span class="p">=</span> <span class="kc">false</span>
<span class="c1">// Retry</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="c1">// 删除当前的版本数据revision</span>
<span class="nx">ret</span><span class="p">,</span> <span class="nx">ttl</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">updateState</span><span class="p">(</span><span class="nx">origState</span><span class="p">,</span> <span class="nx">tryUpdate</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="c1">// If our data is already up to date, return the error</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">mustCheckData</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="c1">// It"s possible we were working with stale data</span>
<span class="c1">// Actually fetch</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">getCurrentState</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">mustCheckData</span> <span class="p">=</span> <span class="kc">false</span>
<span class="c1">// Retry</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="c1">// 编码数据</span>
<span class="nx">data</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">runtime</span><span class="p">.</span><span class="nx">Encode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">ret</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">origState</span><span class="p">.</span><span class="nx">stale</span> <span class="o">&&</span> <span class="nx">bytes</span><span class="p">.</span><span class="nx">Equal</span><span class="p">(</span><span class="nx">data</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">data</span><span class="p">)</span> <span class="p">{</span>
<span class="c1">// 如果我们发现我们当前的数据与获取到的数据一致,则会直接跳过</span>
<span class="k">if</span> <span class="nx">mustCheckData</span> <span class="p">{</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">getCurrentState</span><span class="p">()</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">mustCheckData</span> <span class="p">=</span> <span class="kc">false</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">bytes</span><span class="p">.</span><span class="nx">Equal</span><span class="p">(</span><span class="nx">data</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">data</span><span class="p">)</span> <span class="p">{</span>
<span class="c1">// original data changed, restart loop</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">origState</span><span class="p">.</span><span class="nx">stale</span> <span class="p">{</span>
<span class="c1">// 直接返回数据</span>
<span class="k">return</span> <span class="nx">decode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">data</span><span class="p">,</span> <span class="nx">out</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">rev</span><span class="p">)</span>
<span class="p">}</span>
<span class="p">}</span>
<span class="c1">// 砖汉数据</span>
<span class="nx">newData</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">transformer</span><span class="p">.</span><span class="nx">TransformToStorage</span><span class="p">(</span><span class="nx">data</span><span class="p">,</span> <span class="nx">transformContext</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">storage</span><span class="p">.</span><span class="nx">NewInternalError</span><span class="p">(</span><span class="nx">err</span><span class="p">.</span><span class="nx">Error</span><span class="p">())</span>
<span class="p">}</span>
<span class="nx">opts</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">ttlOpts</span><span class="p">(</span><span class="nx">ctx</span><span class="p">,</span> <span class="nb">int64</span><span class="p">(</span><span class="nx">ttl</span><span class="p">))</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">trace</span><span class="p">.</span><span class="nx">Step</span><span class="p">(</span><span class="s">"Transaction prepared"</span><span class="p">)</span>
<span class="nx">startTime</span> <span class="o">:=</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Now</span><span class="p">()</span>
<span class="c1">// 事务更新数据</span>
<span class="nx">txnResp</span><span class="p">,</span> <span class="nx">err</span> <span class="o">:=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">client</span><span class="p">.</span><span class="nx">KV</span><span class="p">.</span><span class="nx">Txn</span><span class="p">(</span><span class="nx">ctx</span><span class="p">).</span><span class="nx">If</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">Compare</span><span class="p">(</span><span class="nx">clientv3</span><span class="p">.</span><span class="nx">ModRevision</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span> <span class="s">"="</span><span class="p">,</span> <span class="nx">origState</span><span class="p">.</span><span class="nx">rev</span><span class="p">),</span>
<span class="p">).</span><span class="nx">Then</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">OpPut</span><span class="p">(</span><span class="nx">key</span><span class="p">,</span> <span class="nb">string</span><span class="p">(</span><span class="nx">newData</span><span class="p">),</span> <span class="nx">opts</span><span class="o">...</span><span class="p">),</span>
<span class="p">).</span><span class="nx">Else</span><span class="p">(</span>
<span class="nx">clientv3</span><span class="p">.</span><span class="nx">OpGet</span><span class="p">(</span><span class="nx">key</span><span class="p">),</span>
<span class="p">).</span><span class="nx">Commit</span><span class="p">()</span>
<span class="nx">metrics</span><span class="p">.</span><span class="nx">RecordEtcdRequestLatency</span><span class="p">(</span><span class="s">"update"</span><span class="p">,</span> <span class="nx">getTypeName</span><span class="p">(</span><span class="nx">out</span><span class="p">),</span> <span class="nx">startTime</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">trace</span><span class="p">.</span><span class="nx">Step</span><span class="p">(</span><span class="s">"Transaction committed"</span><span class="p">)</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">txnResp</span><span class="p">.</span><span class="nx">Succeeded</span> <span class="p">{</span>
<span class="c1">// 重新获取数据</span>
<span class="nx">getResp</span> <span class="o">:=</span> <span class="p">(</span><span class="o">*</span><span class="nx">clientv3</span><span class="p">.</span><span class="nx">GetResponse</span><span class="p">)(</span><span class="nx">txnResp</span><span class="p">.</span><span class="nx">Responses</span><span class="p">[</span><span class="mi">0</span><span class="p">].</span><span class="nx">GetResponseRange</span><span class="p">())</span>
<span class="nx">klog</span><span class="p">.</span><span class="nx">V</span><span class="p">(</span><span class="mi">4</span><span class="p">).</span><span class="nx">Infof</span><span class="p">(</span><span class="s">"GuaranteedUpdate of %s failed because of a conflict, going to retry"</span><span class="p">,</span> <span class="nx">key</span><span class="p">)</span>
<span class="nx">origState</span><span class="p">,</span> <span class="nx">err</span> <span class="p">=</span> <span class="nx">s</span><span class="p">.</span><span class="nx">getState</span><span class="p">(</span><span class="nx">getResp</span><span class="p">,</span> <span class="nx">key</span><span class="p">,</span> <span class="nx">v</span><span class="p">,</span> <span class="nx">ignoreNotFound</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">err</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">err</span>
<span class="p">}</span>
<span class="nx">trace</span><span class="p">.</span><span class="nx">Step</span><span class="p">(</span><span class="s">"Retry value restored"</span><span class="p">)</span>
<span class="nx">mustCheckData</span> <span class="p">=</span> <span class="kc">false</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="c1">// 获取put响应</span>
<span class="nx">putResp</span> <span class="o">:=</span> <span class="nx">txnResp</span><span class="p">.</span><span class="nx">Responses</span><span class="p">[</span><span class="mi">0</span><span class="p">].</span><span class="nx">GetResponsePut</span><span class="p">()</span>
<span class="k">return</span> <span class="nx">decode</span><span class="p">(</span><span class="nx">s</span><span class="p">.</span><span class="nx">codec</span><span class="p">,</span> <span class="nx">s</span><span class="p">.</span><span class="nx">versioner</span><span class="p">,</span> <span class="nx">data</span><span class="p">,</span> <span class="nx">out</span><span class="p">,</span> <span class="nx">putResp</span><span class="p">.</span><span class="nx">Header</span><span class="p">.</span><span class="nx">Revision</span><span class="p">)</span>
<span class="p">}</span>
<span class="p">}</span>
6. 未曾讲到的地方
transformer的实现和注册地方我并没有找到,只看到了几个覆盖资源类型的地方,还有list/watch接口,后续再继续学习,今天就先到这里,下次再见
kubernetes学习笔记地址: https://www.yuque.com/baxiaoshi/tyado3
微信号:baxiaoshi2020 欢迎一起交流学习分享 公共号:图解源码