可容错的 server proxy 实现

最近需要对手头的一个 rpc proxy 进行升级改造,升级之后要能做到:

  • 自动上线下线故障节点
  • 支持动态扩容和缩容
  • 支持对后端的 server 进行权重分流和负载均衡

这些目标要实现的功能和 MongoDB driver client 的功能很相似,MongoDB 本身就是分布式的系统,支持 replicateSet 和 shard 模式,每个 server 除了读写限制之外还支持使用 tag 来区分,client 能够根据不同的读写策略以及 tag 路由到不同的 server,比如支持:

  • primaryRead
  • secondaryRead
  • nearestRead

而且如果 primary 故障下线了,client 可以自动侦测集群新选举的 primary 进行写操作。

由此可见,MongoDB driver 的设计思路和实现都可以很好的借鉴拿来在我要改造的 rpc client 中实现,这才是开源的真正好处,站在巨人的肩膀上看得更远。

我们先来看看一下 MongoDB driver client 的设计目标,详见server-selection-next-generation-mongodb-drivers

设计 MongoDB client driver 的三个目标

MongoDB client driver 在选择 server 时有三个设计目标:

1. 第一个目标是可预见的

比如说如果一个 application 开发时是用的是单机的 MongoDB 实例,但是部署的时候可能是 replicateSet 的 MongoDB 集群,随着演变还有可能使用的是 shard 集群,但是不论 MongoDB 实例是什么模式,application 代码都应该始终都应该保持一致,只在必要的时候做出适当调整。举个例子来说,比如 application 用 driver 查询使用的是 secondary,那么不论 application 部署在单机实例上,replicateSet 实例上,还是 shard 实例上,它都能正常工作。

2. 第二个目标是有弹性

这就意味着,如果 driver 在选择某个 server 时检测到 server 故障了,driver 应该尝试去找可替代的 server 而不是立即抛出错误。比如对于写,有故障应该等待 primary 节点恢复,或者在 shard 集群中切换到其他 mongos 上。

3. 第三个目标是选中的 server 应该低时延的

也就是说如果多个 server 满足某种 operation,低时延的 server 应该首先被选择

MongoDB 的设计目标决定了 client driver 除了常规 client 应该包含的部分如:

  • network process (网络处理)
  • connection pool (连接池)
  • message process (消息协议处理)
  • error process (容错异常处理)

还包括如下几个组件来帮助它完成以上目标:

  • monitor (server 可用性监控、RTT 计算等任务)
  • server selection (按照一定的规则从候选 server 中选出可用 server)

proxy 的升级改造

我们的 proxy 是 tornado 实现的,基于 tornado 的 TCPServer 和 TCPClient,他们都是基于 tornado 的异步 IOStream。

整个升级过程可以用以下图来描述。

其中对故障节点健康状态监测是通过 tornado 提供的 PeriodicCallback 实现的,PeriodicCallback 提供了定时任务的启停以及状态检测,如果故障列表中有节点加入,主动触发检测任务的启动,开启周期性检测,每次检测完毕之后,更新每个节点的检测状态(包括检测次数和最后一次检测时间),用于下次检测的决策依据。

为了让检测尽量做到均衡,自己实现了一个有序集合来保存故障节点,一个节点在一个周期内都会有机会被检测一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class SortedSet(object):
"""有续集合,用来解决 upstream 检测时的顺序问题
"""
def __init__(self):
self._q = collections.deque()
self._s = dict()
def __contains__(self, key):
return key in self._s
def setdefault(self, key, value):
if key not in self._s:
self._q.append((key, value))
return self._s.setdefault(key, value)
def popleft(self):
key, value = self._q.popleft()
self._s.pop(key, None)
return key, value
def append(self, item):
key, value = item
self._s[key] = value
self._q.append((key, value))
def remove(self, key):
"""线性删除
"""
self._s.pop(key, None)
new_q = collections.deque()
while self._q:
existed_key, value = self._q.pop()
if existed_key != key:
new_q.appendleft((existed_key, value))
self._q = new_q
def __str__(self):
return 'set={0}, queue={1}'.format(self._s, self._q)
__repr__ = __str__

有序集合用了 dqueuedict 两种结构,dict 提供 O(1) 的查找,dqueue 保证了节点的有序。

得益于与 tornado 的单线程模式,由于没有开启多线程,整个代码都不需要有锁的出现,写的比较顺畅。

tornado 这货是我最喜爱的框架之一,为此还造了一个基于它的另一个框架,感兴趣的可以看一下 turbo

三月沙 wechat
扫描关注 wecatch 的公众号