1-Intro

场景: 一个用户行为系统, like, fav 等等都会涉及到计数系统.

目标:

  • 海量数据: 假设存量千亿的行为数据
  • 写入 qps : 百万
  • 读取 qps : 亿

一个用户行为大致大致需要 100Bytes, 算上索引假设 150Bytes

  • userId
  • itemId
  • itemOwnerId
  • releationType
  • eventAt
  • status
  • ..

上面的是核心写入模型:

  • 存量数据粗估是: 15T 左右, 假设一个数据中心至少3副本, 就是 45T
  • 增量的数据, 假设1天平均活跃时间12个小时, 1天增量就是 43G

基于上面的写入模型构建2个读取模型, 假设有2种事件, 点赞和收藏

  • itemCounter: 点赞数和收藏数

    • itemId
    • likeCnt
    • favCnt
  • itemLatestTime: 最后收藏时间和最后点赞时间

    • itemId
    • latestLikeAt
    • latestFavAt

Tips

我们为写操作和读操作使用不同的模型,是比较典型的 CQRS 架构风格, 在高性能设计中 CQRS 扩展性有明显优势

Question: Mysql 是否能顶的住写

Mysql 能提供的:

  • InnoDb
    • B+ 树的索引
    • 主键上有自适应 Hash
  • TokuDb: 使用 Fractal Tree, 也被称为 T-Tree 的数据结构来顶.
    • 单机写入能顶 1W 到 5W 大概这个级别, 有报道说配合优化, 批处理能突破 10W

Facebook , 根据他们的 Blog, 应该还能提供 LSM 结构的 RocksDB 引擎, 对写入也非常友好.

我们使用用 Mysql 分片 + Toku引擎 或者 RocksDB 引擎 来抗写, 然后基于 CDC connector 使用多级缓存来抗读, 是一个 传统的, 有一定可行性的方案 .

思路.

写的大致思路:

  1. 使用事件溯源的设计模式, 业务层写入到 mq 则结束, 例如使用 Kafka. 这应该能很轻松的顶住百万 QPS 的写入操作
  2. 应用层面, 这里可以使用熟悉的语言处理上面的 mq 消息.
  3. 瓶颈大概率集中在 存储的数据库这里了.
    • 批处理能有效的进行事件压缩
    • 上面的 Mysql 是能顶住的
    • 这里还是选择使用 ScyllaDB 作为主存, 理由是更容易做分布式和异地多活, 而且 ScyllaDB 的内置压缩算法比较丰富,可选,能有效的节省存储成本

读的大致思路:

  1. 这里考虑使用 Flink 来构建数据管道. 当然也可以在上面的业务代码中直接处理.
    • flink 本身有很强的扩展性, 无论是性能还是功能
    • source: 可以基于 ScyllaDb cdcKafka, 也可以在上面的应用那里直接发送 Kafka, 使用 eventAt 作为处理时间
    • transform: 按照 itemId 进行聚合, incr decr 或者更新 最后更新时间
    • sink: counter 表和 cache
  2. 分布式缓存, 优先考虑 mcrouter 或者 Redis 如果选用
    • Redis, 数据结构可以是 itemIdKeyHash, HashKey 是行为类型
  3. 要支持这个读流量 要上多级缓存, 应用层也要缓存
  4. 多级缓存估计还不够,要做 多数据中心分担流量, 目前的技术选型中, ScyllaDB Redis 等等都比较容易做
  5. 可以考虑使用边缘计算, AWSLambda@Edge, 配合 Redis 的多数据中心, 进一步提升性能

2-Implementation

多数据中心: 假设有3个数据中心北京1个 BJ, 上海2个 SH1 SH2 实现异地多活中 最简单的 2地3中心

  • cb.bj.com
  • cb.sh.com

入由可以由边缘计算或者客户端层实现, 一个简单的方案就是 客户端 用一个定时器 探测离哪个中心更近就使用哪个 做出入口.

每个数据中心都能独立运行,是一个完整的 Deployment Unit

  • 数据层例如 , ScyllaDB 可以参考 Scylla Multiple Data Center 只需要改 2个配置, 一个 RACK 一个 DC , 注意设置支持多 MutlpleDCSnitch 复制策略即可,不麻烦

  • 应用层要注意要指明希望去的 DC, 一致性由 XXX 改为 LOCAL_XXX, 例如 LOCAL_QUORUM 或者 LOCAL_ONE , 可以参考 Consistent Level

scylladb schema

CREATE TABLE t_act_like
(  
    user_id bigint,  
    state int,  
    item_id text,  
    event_at bigint,
    PRIMARY KEY (user_id, item_id)  
);  
 
 
CREATE TABLE t_like_counter
(  
    item_id bigint,  
    count counter,  
    PRIMARY KEY (item_id)  
);  
 
 
CREATE TABLE t_like_at
(  
    item_id bigint,  
    last_like_at bigint,  
    PRIMARY KEY (item_id)  
);  

消费者伪代码

 
fun batchHandleLikeEvent(events: List<LikeDto>) {
	//1. 按照 userId -> itemId 分组,并且取到 eventAt 时间最新的那一条
	val compressed: List<LikeDto> = compressEvents(events)
 
	//2. 这里可以 并发处理多个不同的 Dto
	compressed.forEach {handleLikeEvent(it)}
}
 
fun handleLikeEvent(event: LikeDto): Boolean {
	// 1. 查询 当前 event 是否是延迟事件,这样能实现有序消费, 不怕 kafka 来的时候乱序
	val eventInDb = findOne(event.userId, event.itemId);
	if (eventInDb != null && eventInDb.eventAt >= event.eventAt)  {
		return false
	} 
 
	// 2. 如果 state 一致, 则完全不用处理.
	if (eventInDb != null && eventInDb.state = event.state) {
		return false
	}
 
	// 3. 先动数据库
	likeDao.save(event)
 
	// 4. 可选操作1: 如果担心延迟太高,提前修改缓存也是可以的, 最后基于 counter 表实现最终一致性
	// optionalUpdateItemCounter(event)
 
	// 5. 可选操作2: 这里可以发送一个  确认的点赞或者取消点赞数据, 或者直接 cdc 用来改 counter 并修改 cache
	optionalSendConfirmedEvent(event)
 
	// 6. 可选操作3: 这里还可以 直接异步的修改 counter
	optionalIncrItemCounter(event)
}
 
  • 我们这里 最关键的是修改了 event, 也就是 primary , 后续的不管是 计数器 还是缓存,都走这里进行 补偿, 实现最终一致性