问题描述
现在的我司有个推送的业务 Python 实现,每次要对几百万用户进行特定推送(具体业务实现也比较复杂,有很多过滤条件),并且要实时统计推送用户中 Android 和 IPhone 各占比多少,推送的用户是根据特定的条件过滤出来的,这些用户过滤出来之后只以用户 id 的形式存在,并不知道他们所属的设备是什么,而且这批用户中还包含大量的非注册用户,因此如果要知道用户所属设备必须再次从数据库获取。
在这个推送的早期,用户量其实很少,程序员每次都是从数据库中全量获取所有数据,当时内存占用不到 1 G。随着业务规模的扩大,每次推送的用户越来越多,导致程序占用的内存到了 10 个G,对问题进行分析之后发现,最耗内存的包含两部分:一是筛选用户每次几百万数据全部加载内存,然后依次进行筛选过滤;二是对推送用户进行统计是实时统计的,每次也是几百万用户数据全部加载,然后再过滤筛选。
目标
- 降低内存的占用量
- 保证推送在百万级别下依然客观
解决方案
避免全量用户进行计算
对满足条件的用户一次筛选一批,然后进行计算,然后再筛选下一批,而不是一次全部加载再计算,避免内存过早被占用过大
实时统计部分进行分离
对需要统计的百万用户进行分离统计
1 和 2 设计原则都是基于生产者消费者模式,1 可以借助 gevent 或 thread 进行多 worker 协同过滤筛选,既保证速度又避免内存过多占用;由于是在本机进行,统计分离可以借助外部存储实现,比如推送过程中把推送数据写入外部存储,再进行统计,为了达到实时的目的,在此实现了一个 UNIX domain server 作为 consumer。
Unix domain socket
Unix domain socket 必须是同主机间进程才能使用,而且 API 完全兼容 TCP socket,使用简单,但是不同于 TCP socket 它没有 TCP 网络协议栈的约束,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。Unix Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
使用 unix domain socket
创建 unix domain socket server 的过程和创建普通 TCP socket 没有太大差异
1 | def socket_server(): |
客户端与 TCP 的区别就是不是通过 IP 和 PORT 来识别 server 而是通过 pathname
1 | dirname = os.path.dirname(os.path.abspath(__file__)) |
注意
- 不同于 TCP socket server,如果 backlog 满了,client 再去尝试连接 domain socket server 会直接被拒绝,报 connection refused,无法再继续重试。
- 每次启动 server 都需要检查使用的 pathname 是否已经存在,如果存在需要删除 unlink(sockname) 然后再 bind。