缓存和发布订阅都是后端开发中常用的手段,其中缓存主要是用于可丢失数据的暂存,发布订阅主要是用于消息传递,今天给大家介绍一个k8s中带有发布订阅的缓存实现,其目标是给定一个时间,只关注该时间后续的事件,主要是用于近实时状态数据的获取
1. 业务背景
在k8s中的kubelet中支持不同的容器运行时,为了缓存容器运行时当前所有可见的Pod/Container就构造了一个Cache结构,当一个事件发生后,kubelet接收到事件后,此时需要获取当前Pod的状态,此时要获取的状态,就必须要求是在事件产生后的最新的状态,而不能是之前的状态,
2. 核心实现
2.1 数据与订阅记录
2.1.1 状态数据
状态数据主要是存储一个pod的状态数据
<span class="kd">type</span> <span class="nx">data</span> <span class="kd">struct</span> <span class="p">{</span>
<span class="c1">// 存储Pod的状态</span>
<span class="nx">status</span> <span class="o">*</span><span class="nx">PodStatus</span>
<span class="c1">// 试图检测Pod状态出错信息</span>
<span class="nx">err</span> <span class="kt">error</span>
<span class="c1">// 上次数据的修改时间</span>
<span class="nx">modified</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span>
<span class="p">}</span>
www#gaodaima.com来源gao@!dai!ma.com搞$$代^@码!网搞代码
2.1.2 订阅记录
订阅记录其实指的是一个订阅需求,其通过一个chan来进行数据通知,其中time字段是过滤条件,即只有时间大于time的记录才允许被加入到chan中
<span class="kd">type</span> <span class="nx">subRecord</span> <span class="kd">struct</span> <span class="p">{</span>
<span class="nx">time</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span>
<span class="nx">ch</span> <span class="kd">chan</span> <span class="o">*</span><span class="nx">data</span>
<span class="p">}</span>
2.2 Cache实现
2.2.1 核心成员结构
cache里面的数据在kubelet每次进行PLEG更新的时候,都会更新timestamp,并且会重新获取最新的Pod状态进行填充cache,所以这里会更新timestamp,寓意着让之前旧的状态都过期,并且会针对旧的订阅的进行数据的返回
<span class="c1">// cache implements Cache.</span>
<span class="kd">type</span> <span class="nx">cache</span> <span class="kd">struct</span> <span class="p">{</span>
<span class="c1">// 读写锁</span>
<span class="nx">lock</span> <span class="nx">sync</span><span class="p">.</span><span class="nx">RWMutex</span>
<span class="c1">// 存储Pod的状态数据,用于满足不带时间戳的状态获取</span>
<span class="nx">pods</span> <span class="kd">map</span><span class="p">[</span><span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">]</span><span class="o">*</span><span class="nx">data</span>
<span class="c1">// 全局时间戳,即当前缓存中的数据,至少都要比该时间戳新</span>
<span class="nx">timestamp</span> <span class="o">*</span><span class="nx">time</span><span class="p">.</span><span class="nx">Time</span>
<span class="c1">//存储对应Pod的定语记录列表</span>
<span class="nx">subscribers</span> <span class="kd">map</span><span class="p">[</span><span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">][]</span><span class="o">*</span><span class="nx">subRecord</span>
<span class="p">}</span>
2.2.3 普通状态数据获取
普通状态获取即直接通过Map来进行数据的返回
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">Get</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">)</span> <span class="p">(</span><span class="o">*</span><span class="nx">PodStatus</span><span class="p">,</span> <span class="kt">error</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">RLock</span><span class="p">()</span>
<span class="k">defer</span> <span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">RUnlock</span><span class="p">()</span>
<span class="nx">d</span> <span class="o">:=</span> <span class="nx">c</span><span class="p">.</span><span class="nx">get</span><span class="p">(</span><span class="nx">id</span><span class="p">)</span>
<span class="k">return</span> <span class="nx">d</span><span class="p">.</span><span class="nx">status</span><span class="p">,</span> <span class="nx">d</span><span class="p">.</span><span class="nx">err</span>
<span class="p">}</span>
2.2.4 默认状态构造器
当发现当前的cahce中并不存在对应的数据,则是直接根据ID来生成一个默认的状态数据
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">get</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">)</span> <span class="o">*</span><span class="nx">data</span> <span class="p">{</span>
<span class="nx">d</span><span class="p">,</span> <span class="nx">ok</span> <span class="o">:=</span> <span class="nx">c</span><span class="p">.</span><span class="nx">pods</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">ok</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">makeDefaultData</span><span class="p">(</span><span class="nx">id</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">return</span> <span class="nx">d</span>
<span class="p">}</span>
<span class="c1">// 默认状态构造器</span>
<span class="kd">func</span> <span class="nx">makeDefaultData</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">)</span> <span class="o">*</span><span class="nx">data</span> <span class="p">{</span>
<span class="k">return</span> <span class="o">&</span><span class="nx">data</span><span class="p">{</span><span class="nx">status</span><span class="p">:</span> <span class="o">&</span><span class="nx">PodStatus</span><span class="p">{</span><span class="nx">ID</span><span class="p">:</span> <span class="nx">id</span><span class="p">},</span> <span class="nx">err</span><span class="p">:</span> <span class="kc">nil</span><span class="p">}</span>
<span class="p">}</span>
2.2.5 最新状态数据获取
会给定一个时间戳,只有当当前缓存的数据的时间在该时间戳之后,才有效,否则返回nil,这里有个关键点就是timestamp的相关设计,因为在每个PLEG周期中,都会更新timestamp
如果minTime<globalTimestamp, 则意味着在已经有新一轮的更新,而你这个事件还是上一轮的事件,则可能就是事件的处理太慢,此时就会将之前缓存的状态,直接返回,因为下一轮很有可能会有新的事件到来
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">getIfNewerThan</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">,</span> <span class="nx">minTime</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="o">*</span><span class="nx">data</span> <span class="p">{</span>
<span class="c1">// 获取当前的状态</span>
<span class="nx">d</span><span class="p">,</span> <span class="nx">ok</span> <span class="o">:=</span> <span class="nx">c</span><span class="p">.</span><span class="nx">pods</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span>
<span class="c1">// 如果全局时间戳大于给定的时间,则会直接返回</span>
<span class="nx">globalTimestampIsNewer</span> <span class="o">:=</span> <span class="p">(</span><span class="nx">c</span><span class="p">.</span><span class="nx">timestamp</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="o">&&</span> <span class="nx">c</span><span class="p">.</span><span class="nx">timestamp</span><span class="p">.</span><span class="nx">After</span><span class="p">(</span><span class="nx">minTime</span><span class="p">))</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">ok</span> <span class="o">&&</span> <span class="nx">globalTimestampIsNewer</span> <span class="p">{</span>
<span class="c1">// 状态没有缓存,但是全局时间比最小时间新,就直接返回</span>
<span class="k">return</span> <span class="nx">makeDefaultData</span><span class="p">(</span><span class="nx">id</span><span class="p">)</span>
<span class="p">}</span>
<span class="c1">// 如果之前数据的时间在获取时间之后,或者全局时间已经更新</span>
<span class="k">if</span> <span class="nx">ok</span> <span class="o">&&</span> <span class="p">(</span><span class="nx">d</span><span class="p">.</span><span class="nx">modified</span><span class="p">.</span><span class="nx">After</span><span class="p">(</span><span class="nx">minTime</span><span class="p">)</span> <span class="o">||</span> <span class="nx">globalTimestampIsNewer</span><span class="p">)</span> <span class="p">{</span>
<span class="k">return</span> <span class="nx">d</span>
<span class="p">}</span>
<span class="c1">// The pod status is not ready.</span>
<span class="k">return</span> <span class="kc">nil</span>
<span class="p">}</span>
2.2.6 订阅状态管道构造
订阅管道最终会返回一个状态的管道,同时会进行检查,如果发现当前有可用数据,则会直接丢进管道中,否则则创建一个subRecords订阅记录,并保存
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">subscribe</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">,</span> <span class="nx">timestamp</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="kd">chan</span> <span class="o">*</span><span class="nx">data</span> <span class="p">{</span>
<span class="nx">ch</span> <span class="o">:=</span> <span class="nb">make</span><span class="p">(</span><span class="kd">chan</span> <span class="o">*</span><span class="nx">data</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Lock</span><span class="p">()</span>
<span class="k">defer</span> <span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Unlock</span><span class="p">()</span>
<span class="c1">// 获取状态数据</span>
<span class="nx">d</span> <span class="o">:=</span> <span class="nx">c</span><span class="p">.</span><span class="nx">getIfNewerThan</span><span class="p">(</span><span class="nx">id</span><span class="p">,</span> <span class="nx">timestamp</span><span class="p">)</span>
<span class="k">if</span> <span class="nx">d</span> <span class="o">!=</span> <span class="kc">nil</span> <span class="p">{</span>
<span class="c1">// 如果已经有状态数据,则立即返回</span>
<span class="nx">ch</span> <span class="o"><-</span> <span class="nx">d</span>
<span class="k">return</span> <span class="nx">ch</span>
<span class="p">}</span>
<span class="c1">// 否则添加一个订阅记录到subscribers中对应的列表中</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span><span class="p">[</span><span class="nx">id</span><span class="p">],</span> <span class="o">&</span><span class="nx">subRecord</span><span class="p">{</span><span class="nx">time</span><span class="p">:</span> <span class="nx">timestamp</span><span class="p">,</span> <span class="nx">ch</span><span class="p">:</span> <span class="nx">ch</span><span class="p">})</span>
<span class="k">return</span> <span class="nx">ch</span>
<span class="p">}</span>
2.2.7 通知清理过期管道
通知的时候回根据subRecord的订阅时间进行检测,如果订阅时间已经超过当前的 timestamp则直接获取数据进行返回,最后只会保留那些还未过期的订阅记录
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">notify</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">,</span> <span class="nx">timestamp</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="p">{</span>
<span class="c1">// 获取事件的ID列表</span>
<span class="nx">list</span><span class="p">,</span> <span class="nx">ok</span> <span class="o">:=</span> <span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span>
<span class="k">if</span> <span class="p">!</span><span class="nx">ok</span> <span class="p">{</span>
<span class="c1">// No one to notify.</span>
<span class="k">return</span>
<span class="p">}</span>
<span class="nx">newList</span> <span class="o">:=</span> <span class="p">[]</span><span class="o">*</span><span class="nx">subRecord</span><span class="p">{}</span>
<span class="c1">// 遍历所有的订阅记录subRecords</span>
<span class="k">for</span> <span class="nx">i</span><span class="p">,</span> <span class="nx">r</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">list</span> <span class="p">{</span>
<span class="c1">// 如果这些订阅记录的时间在timestamp之前,就不进行操作, 即当前管道时间>timestamp</span>
<span class="k">if</span> <span class="nx">timestamp</span><span class="p">.</span><span class="nx">Before</span><span class="p">(</span><span class="nx">r</span><span class="p">.</span><span class="nx">time</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">newList</span> <span class="p">=</span> <span class="nb">append</span><span class="p">(</span><span class="nx">newList</span><span class="p">,</span> <span class="nx">list</span><span class="p">[</span><span class="nx">i</span><span class="p">])</span>
<span class="k">continue</span>
<span class="p">}</span>
<span class="c1">// 获取一个数据返回, 同时关闭管道</span>
<span class="nx">r</span><span class="p">.</span><span class="nx">ch</span> <span class="o"><-</span> <span class="nx">c</span><span class="p">.</span><span class="nx">get</span><span class="p">(</span><span class="nx">id</span><span class="p">)</span>
<span class="nb">close</span><span class="p">(</span><span class="nx">r</span><span class="p">.</span><span class="nx">ch</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="nx">newList</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span> <span class="p">{</span>
<span class="c1">// 如果不存在订阅记录,则就删除对应的key</span>
<span class="nb">delete</span><span class="p">(</span><span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span><span class="p">,</span> <span class="nx">id</span><span class="p">)</span>
<span class="p">}</span> <span class="k">else</span> <span class="p">{</span>
<span class="c1">// 剩余的订阅列表</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span> <span class="p">=</span> <span class="nx">newList</span>
<span class="p">}</span>
<span class="p">}</span>
2.2.8 全局时间戳更新
全局时间戳更新,则会遍历所有的订阅,以最新的全局时间戳作为时间,进行通知
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">UpdateTime</span><span class="p">(</span><span class="nx">timestamp</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Lock</span><span class="p">()</span>
<span class="k">defer</span> <span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Unlock</span><span class="p">()</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">timestamp</span> <span class="p">=</span> <span class="o">&</span><span class="nx">timestamp</span>
<span class="c1">// Notify all the subscribers if the condition is met.</span>
<span class="k">for</span> <span class="nx">id</span> <span class="o">:=</span> <span class="k">range</span> <span class="nx">c</span><span class="p">.</span><span class="nx">subscribers</span> <span class="p">{</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">notify</span><span class="p">(</span><span class="nx">id</span><span class="p">,</span> <span class="o">*</span><span class="nx">c</span><span class="p">.</span><span class="nx">timestamp</span><span class="p">)</span>
<span class="p">}</span>
<span class="p">}</span>
2.2.9 Pod事件更新通知函数
更新的时候,则会调用notify来进行通知
<span class="kd">func</span> <span class="p">(</span><span class="nx">c</span> <span class="o">*</span><span class="nx">cache</span><span class="p">)</span> <span class="nx">Set</span><span class="p">(</span><span class="nx">id</span> <span class="nx">types</span><span class="p">.</span><span class="nx">UID</span><span class="p">,</span> <span class="nx">status</span> <span class="o">*</span><span class="nx">PodStatus</span><span class="p">,</span> <span class="nx">err</span> <span class="kt">error</span><span class="p">,</span> <span class="nx">timestamp</span> <span class="nx">time</span><span class="p">.</span><span class="nx">Time</span><span class="p">)</span> <span class="p">{</span>
<span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Lock</span><span class="p">()</span>
<span class="k">defer</span> <span class="nx">c</span><span class="p">.</span><span class="nx">lock</span><span class="p">.</span><span class="nx">Unlock</span><span class="p">()</span>
<span class="c1">// 进行事件的通知</span>
<span class="k">defer</span> <span class="nx">c</span><span class="p">.</span><span class="nx">notify</span><span class="p">(</span><span class="nx">id</span><span class="p">,</span> <span class="nx">timestamp</span><span class="p">)</span>
<span class="c1">// 保存最新的状态数据 </span>
<span class="nx">c</span><span class="p">.</span><span class="nx">pods</span><span class="p">[</span><span class="nx">id</span><span class="p">]</span> <span class="p">=</span> <span class="o">&</span><span class="nx">data</span><span class="p">{</span><span class="nx">status</span><span class="p">:</span> <span class="nx">status</span><span class="p">,</span> <span class="nx">err</span><span class="p">:</span> <span class="nx">err</span><span class="p">,</span> <span class="nx">modified</span><span class="p">:</span> <span class="nx">timestamp</span><span class="p">}</span>
<span class="p">}</span>
今天就到这里,这些数据结构和设计有很多值得学习地方,希望大家能多多交流,一起学习云原生相关的设计与关键实现
公共号:图解源码 欢迎一起交流学习分享, 电子书地址: https://www.yuque.com/baxiaoshi/tyado3