Unix domain socket 实现和使用

问题描述

现在的我司有个推送的业务 Python 实现,每次要对几百万用户进行特定推送(具体业务实现也比较复杂,有很多过滤条件),并且要实时统计推送用户中 Android 和 IPhone 各占比多少,推送的用户是根据特定的条件过滤出来的,这些用户过滤出来之后只以用户 id 的形式存在,并不知道他们所属的设备是什么,而且这批用户中还包含大量的非注册用户,因此如果要知道用户所属设备必须再次从数据库获取。

在这个推送的早期,用户量其实很少,程序员每次都是从数据库中全量获取所有数据,当时内存占用不到 1 G。随着业务规模的扩大,每次推送的用户越来越多,导致程序占用的内存到了 10 个G,对问题进行分析之后发现,最耗内存的包含两部分:一是筛选用户每次几百万数据全部加载内存,然后依次进行筛选过滤;二是对推送用户进行统计是实时统计的,每次也是几百万用户数据全部加载,然后再过滤筛选。

目标

  1. 降低内存的占用量
  2. 保证推送在百万级别下依然客观

解决方案

  1. 避免全量用户进行计算

    对满足条件的用户一次筛选一批,然后进行计算,然后再筛选下一批,而不是一次全部加载再计算,避免内存过早被占用过大

  2. 实时统计部分进行分离

    对需要统计的百万用户进行分离统计

1 和 2 设计原则都是基于生产者消费者模式,1 可以借助 gevent 或 thread 进行多 worker 协同过滤筛选,既保证速度又避免内存过多占用;由于是在本机进行,统计分离可以借助外部存储实现,比如推送过程中把推送数据写入外部存储,再进行统计,为了达到实时的目的,在此实现了一个 UNIX domain server 作为 consumer

Unix domain socket

Unix domain socket 必须是同主机间进程才能使用,而且 API 完全兼容 TCP socket,使用简单,但是不同于 TCP socket 它没有 TCP 网络协议栈的约束,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。Unix Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。

使用 unix domain socket

创建 unix domain socket server 的过程和创建普通 TCP socket 没有太大差异

1
2
3
4
5
6
7
8
9
10
11
def socket_server():
dirname = os.path.dirname(os.path.abspath(__file__))
# 指定 socket 是 unix domain socket
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sockname = os.path.join(dirname, os.path.basename(__file__) + '.sock')
if os.path.exists(sockname):
os.remove(sockname)
server.bind(sockname)
server.listen(2046)
while 1:
conn, addr = server.accept()

客户端与 TCP 的区别就是不是通过 IP 和 PORT 来识别 server 而是通过 pathname

1
2
3
dirname = os.path.dirname(os.path.abspath(__file__))
client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
client.connect(os.path.join(DIRNAME, 'push_count_server.py.sock'))

注意

  1. 不同于 TCP socket server,如果 backlog 满了,client 再去尝试连接 domain socket server 会直接被拒绝,报 connection refused,无法再继续重试。
  2. 每次启动 server 都需要检查使用的 pathname 是否已经存在,如果存在需要删除 unlink(sockname) 然后再 bind。

参考资料

三月沙 wechat
扫描关注 wecatch 的公众号