Skip to main content

7. Top K Problem

题目

寻找数据流中出现最频繁的 k 个元素(find top k frequent items in a data stream)。这个问题也称为 Heavy Hitters. 这题也是从实践中提炼而来的,例如搜索引擎的热搜榜,找出访问网站次数最多的前 10 个 IP 地址,等等。

这类型的题目大多都是设计一个类似排行榜的系统,通过对海量数据的聚合处理最后获得排名前 K 的东西,比如:

  • 系统设计,一个 music app,选出 top 10 songs。
  • 实现前 5 分钟,1 小时,24 小时内分享最多的 post 的系统
  • Design Twitter Trend, Determine trending topics
  • 一个 K recent contact 的 service,就是当用户把鼠标点到 chat 对话框的时候,自动弹出 K 个最近的联系人。follow-up 是如果要弹出 K 个最熟悉的人怎么设计,以及资源估计(需要多少台机器来做数据存储,多少个处理 request 等等)
  • Desgin an Advertisement statistic system
  • 一个大型系统中 Top K 个 Exception

总结

特点是写频繁,不用完全精确(精确程度需要和面试官确认)。由于是海量数据,推荐异步操作, 核心就是三个 service

  • 数据搜集(可以通过采样、缓存等方式优化)
  • 数据聚合(一致性哈希分配机器,桶,堆和队列进行统计)
  • 数据查询(缓存等优化)

需求分析

直接需求

为了行文方便,我们以社交网络用户用的最多的话题(hashtag) # 为例子。需求有

  • 在大量分享中选出前 k 个热门话题
  • 热门分享要根据时间分为 10 分钟,1 小时,1 天热门
  • 用户每次发帖,带上话题(hashtag) # ,记录为一次

隐含需求

以下内容需要跟面试官确认。

  • 写频繁 (write heavy)
  • 数据不要求有非常高的实时性(可以有适当延迟)
  • 数据不要求有非常高的精确性(只需要知道 top k, 不需要知道具体分享的次数)
  • 高可用性(一般系统都需要)
  • 数据存储一致性 Consistency: 最终一致即可 Eventual consistency,CAP 中更侧重 A,P
  • Top K 的 k 有上限,比如 100 以内
  • 10 分钟,1 小时,1 天是从当前时间往前推算的,时间是一个固定数字

估算 Estimation

  • DAU: 1 Billion, 20% of user post everyday
  • Write QPS: 1B * 20% / 86400 ~ 200M/100K ~ 2k QPS, peak QPS ~ 6k
  • Read QPS: 假设为写的 10%, 200 QPS
  • Data storage: 假设 word (20 Bytes), count(8 Bytes) , 200M/day * 28 Bytes ~ 200*30MB ~ 6GB / day

数据量虽然不大,但是 QPS 比较高,需要多台机器处理。

服务设计 Service Design

整体设计原理大致类似于 Map-Reduce。我们先从最简单的情况开始计算,假如只需要单机处理少量数据,我们会把系统分为以下几个服务:

数据搜集 Data Collection

每当用户发帖的时候,搜集帖子中的话题#,进行分析统计,类似于 Map-Reduce 中的 Map

数据聚合 Data Aggregation & 数据查询 Query

将搜集到的数据聚合统计,类似于 Map-Reduce 中的 reduce。返回 x 时间段内的 k 个热门话题。从算法和数据结构的角度来说,为了返回 k 个热门话题,最简单的办法是去数据库查询时间范围内所有的记录,然后做一个排序,显然这样效率很低。通常为了返回 top k,我们只需要在内存内维护一个小根堆(min heap)即可。当写频繁的时候,如果直接每次更新堆,频繁改动堆效率也十分低下。所以我们还需要通过其他数据结构作为辅助。这里我们可以选用队列 (Queue)和哈希表(HashMap)。

我们用哈希表 Map<String, Long> 存放每个话题出现的次数。同时我们将时间划分为多个桶(bucket)放入队列,桶的大小根据精度来确定,如果需要更高的精度,甚至可以以 1 秒为单位划分。假设我们不需要十分精确,当需要统计过去 5 分钟的数据,我们可以以 30 秒为单位,将 5 分钟划分为 10 个桶。需要一个小时,我们可以 1 分钟为单位划分,需要一天的数据,我们可以以半小时为单位划分。当某个话题# 被用户发出的时候,我们把该话题放入当前(队尾)30 秒,当前分钟,当前 30 分钟所对应的桶里面。

当时间过去了 30 秒/1 分钟/30 分钟,我们将更早(队首)的桶移除队列,被移除的桶里面存有多个话题的出现次数,我们在 Map 中找到对应话题,减去相应次数。同时把队尾桶中的统计数据加入 Map 中。通过桶划分,我们把频繁的+1,-1 操作变成了每隔一段时间 +x, -y,从而增加效率。同时我们保持一个大小为 K 的堆,当 Map 被更新以后,我们同时去堆里面寻找对应话题,如果堆里存在相应话题,则更新频率,如果不存在堆里,则和堆顶的元素比较,如果频率比堆顶元素的频率高,则替换堆顶元素。

查询的时候,只需要返回堆即可知道 top k 是什么。

服务扩容 Scale Up

多台机器处理

当用户发帖量巨大的时候,假如我们用一台机器来统计数据,每次用户发帖都要记录到该机器上,机器负载过大,瓶颈是请求数量,而不是数据大小。此时数据聚合(Data Aggregation)服务需要多台机器进行统计。在划分(sharding)的时候我们需要特别注意划分方式,我们应该按照话题来划分,比如第 1 台机器只处理 #ABC 这个话题,所有的 #ABC 都应该分配到机器 1 来处理,每台机器都维持哈希表,桶队列和堆,最后我们把每台机器的 top k 合并再算出最终的 top k。

如果不按照话题来划分,会导致某个局部不是 top k,但全局是 top k 的话题无法被正确统计到。例如机器 1 [#ABC: 2, #CDE: 3],则 Top1 是#CDE:3,机器 2 [#ABC: 5, #FGH:5]则 top1 是 #FGH:5, 最后综合机器 1,2,算出来的 top1 是#FGH:5。 正确的 Top1 应该是 #ABC:7。所以正确的划分方式是把 ABC 都划分到机器 1,[#ABC:2, #ABC:5], 所有的 CDE, FGH 都分到机器 2 [#CDE: 3, #FGH:5] 这样我们就可以找出正确的 top 1。

为了保证每台机器的负载均衡和灾备,我们可以考虑使用一致性哈希(consitent hashing)作为划分算法。

减少网络流量 network traffic 、存储空间

如果发帖量多,每次在多台服务器之间发, 数据搜集和聚合服务之间的流量会变得很大,频发地发送+1,-1 十分占用网络资源,此时我们可以通过缓存消息队列抽样等方法来减少网络流量。具体使用哪种方法可以根据需求来讨论,也可以都使用。

缓存

首先在数据搜集服务中进行一次缓存,考虑到我们只需要存话题和出现频率,占用存储空间很小,每台服务器都可以放一个 in-memory cache 专门来统计数据,每次用户发送带有话题 #ABC 的帖子,就更新缓存,搜集服务每隔 2~5 秒把数据发送给数据聚合服务 好处:成倍减少服务器间流量 坏处:根据发送频率不同,造成的大小不同的延时(一般几秒不是大问题)

消息队列

在数据搜集服务和数据聚合服务之间增加个消息队列,数据搜集服务可以随时根据实际情况扩容,通过消息队列作为缓存,让数据聚合服务以某个固定的 QPS 从消息队列中读取。 好处:可以削峰,比如当某个话题突然变成热门的时候,流量突然突然变大,通过队列可以保证聚合服务不会突然承受大量请求。 坏处:流量突然变大,生产速度远远大于消费速度,导致缓存越来越多,延时会相应增加。

抽样

考虑到每天用户发的帖子中,会有大量话题#只出现过一两次,而我们只需要知道 Top K,k 通常是一个比较小的数据,如果每个话题都存储,会存在长尾效应,我们存储量了大量没用的数据。因此我们可以对数据进行抽样,最简单的抽样办法就是给每个请求一个概率来判断要不要存储,比如 0.1,就只有 10%的请求会被继续处理,剩下的都丢弃,更复杂的抽样办法可以参考[海量数据处理]一文中介绍的方法。

  • 好处: 根据抽样方法不懂,可以数十倍或者数百倍减少流量。
  • 坏处: 抽样导致精度丧失,如果需要统计准确的频率,不可使用抽样。

其他

如果需求是用户每次登陆能看到最新热门话题

需要重新计算读写 QPS。 当用户每次登陆都能看到的时候,问题就从写频繁变成读写都频繁。我们只需要增加一个分布式缓存服务即可,然后聚合服务每隔一段时间 (2 秒,1 分钟,10 分钟...) 发送 push 最近的 top k 到缓存即可。

如果需要存储历史数据

我们只需要另外增加个服务,将缓存中的数据存储即可。数据库可以使用最简单的 Key-value store.