- 点击、浏览
- 热门商品、近期热门商品、分类热门商品,流量统计
- 收藏、喜欢、评分、打标签
- 用户画像(用户信息标签化),推荐列表(结合特征工程和机器学习算法)
- 下订单、支付、登录
- 刷单监控,订单失效监控,恶意登录(短时间内频繁登录失败)监控
偏好统计在这不做主要实现,涉及到机器学习算法相关的知识,因此本系统只设计实现统计分析与风险控制这两大模块。
- 实时热门商品统计
- 实时热门页面流量统计
- 实时访问流量统计
- APP 市场推广统计
- 页面广告点击量统计
- 页面广告黑名单过滤
- 恶意登录监控
- 订单支付失效监控
- 支付实时对账
使用网上搜集的淘宝网某一天随机一百万用户的所有行为(包括点击、购买、 收藏 、喜欢 )。数据集特征如下:
字段 | 数据类型 | 说明 |
---|---|---|
userId | Long | 用户ID |
itemID | Long | 商品ID |
categoryID | Integer | 商品类别ID |
behavior | String | 用户行为('pv','buy','cart','fav) |
timestamp | Long | 行为发生的时间戳,单位秒 |
数据举例:
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
服务器端收集的用户访问日志,数据集特征如下:
字段 | 数据类型 | 说明 |
---|---|---|
ip | String | 访问者IP |
userID | Long | 访问者ID |
eventTime | Long | 访问时间 |
method | String | 访问方法(GET/POST/PUT/DELETE) |
url | String | 访问的URL |
数据举例:
83.149.9.216 - - 17/05/2015:10:05:03 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-search.png
83.149.9.216 - - 17/05/2015:10:05:43 +0000 GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png
83.149.9.216 - - 17/05/2015:10:05:47 +0000 GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js
83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
统计近1小时内的热门商品,每5分钟更新一次,输出topN 的商品ID。
- 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
- 过滤出浏览点击(“pv”)行为进行统计
- 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
- 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品
读取服务器日志中的每一行 log 统计在一段时间内用户访问 每一个 url 的次数 ,然后排序输出显示 。
与实现热门商品统计的思路相同,开一个10分钟的滑动窗口,每隔5秒滑动一次,输出最近 10 分钟内访问 量最多的前 N 个 URL。
实现统计一个网站在一个小时内总浏览量(PV,Page View)。
按照'pv'对用户行为进行过滤,然后开一个小时的滚动窗口,统计一个小时内的数量。
在一段时间内到底有多少不同的用户访问了网站。即存在一个统计指标是网站的独立访客数(Unique Visitor UV )。 UV指的是一段时间(比如一小时)内访问网站的总人数, 1 天内同一访客的多次访问只记录为一个访客。
根据用户行为数据中的userId来区分不同的用户,有两种的实现思路:
1)第一种跟之前统计PV思路类似,在窗口函数中计算不同userId 的集合数量作为uv数量。
2)第二种使用使用布隆过滤器( Bloom Filter )对用户状态进行压缩。
第一种思路是把所有数据的userId 都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。但是在数据量很大的情况下,内存估计会爆掉而且性能下降,可以想到的一个思路是将数据存到Redis中进行缓存,但是如果数据量很大的时候可能需要扩展Redis集群,这样做明显代价太大。
因此更好的思路上从过滤出的数据上进行压缩,使用布隆过滤器( Bloom Filter )。不需要完整地存储用户ID 的信息,只要知道他在不在就行了。因此用一位( bit )就可以表示一个用户的状态。
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilistic data structure ),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存 在或者可能存在”。它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0 ,就是 1 。 相比于传统的 List 、 Set 、 Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。
解决思路就是利用某种方法(一般是Hash 函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1 ,不存在则为 0 。
随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机APP 成为了更多用户访问电商网站的首选 。对于电商企业来说,一般会通过各种不同的渠道对自己的 APP 进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、 APP 下载量 )就成了市场营销的重要商业指标。
自己创建一些模拟数据,分为用户行为和来源渠道,然后实现SourceFunction,随机生成一些数据。
// 定义用户行为和来源渠道
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "weibo");
分(不分)渠道统计APP的推广数量的总量
先过滤掉非推广行为("UNINSTALL")分渠道开窗统计,同时按照渠道"channel"和用户行为"behavior"两个key来对数据进行分组统计,自定义预聚合函数和窗口函数,完成累加和窗口聚合操作。
与上面思路类似,更简单的是直接对窗口的全部数据分到一个组统计数量即可,不区分渠道。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信 息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
以按省份区分不同用户为例,统计用户点击广告的数量。数据使用网上搜集的测试数据,如下所示:
userId,adId,province,city,timestamp
543462,1715,beijing,beijing,1511658000
662867,2244074,guangdong,guangzhou,1511658060
561558,3611281,guangdong,shenzhen,1511658120
894923,1715,beijing,beijing,1511658180
834377,2244074,shanghai,shanghai,1511658240
存在一种问题是同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告, 这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计。
对数据根据province
分组,开1小时的滑动窗口,滑动步长5秒,统计窗口内的点击数量。整理思路类似之前统计热门商品。
使用两个ValueState记录用户的点击次数以及其是否标记为黑名单用户,每次发生用户点击事件的时候,如果不存在计时器就注册一个定时器,然后更新累加计数次数,如果到达设定的计数上限,就加入黑名单,并且把该报警信息用侧输出流输出。当计时器触发的时候就清空当前的状态。
对于网站而言,用户登录并不是频繁的业务操作。如果一个用户短时间内频繁登录失败,就有可能是出现了程序的恶意攻击,比如密码暴力破解 。因此我们考虑,应该对用户的登录失败动作进行统计,具体来说,如果同一用户(可以是不同 IP在 2 秒之内连续两次登录失败,就认为存在恶意登录的风险,输出相关的信息进行报警提示。这是电商网站、也是几乎所有网站风控的基本一环。
为方便测试,自定义一些数据集,表明用户的ip,登录时间以及登录状态,示例数据如下:
useID,ip,loginState,timestamp
5402,83.149.11.115,success,1558430815
23064,66.249.3.15,fail,1558430826
5692,80.149.25.29,fail,1558430833
7233,86.226.15.75,success,1558430832
5692,80.149.25.29,success,1558430840
由于同样引入了时间,我们可以想到,最简单的方法其实与之前的热门统计类似,只需要按照用户 ID 分流,然后遇到登录失败的事件时将其保存在 ListState 中, 然后设置一个定时器, 2 秒后触发。定时器触发时检查状态中的登录失败事件个数,如果大于等于 2 ,那么就输出报警信息。
Flink 为我们提供了 CEP( Complex Event Processing ,复杂事件处理库) ,用于在流中筛选符合某种复杂模式的事件。因此还可以使用该方法来处理,而且能应对的复杂情形更多,结果更健壮。
定义匹配模式Pattern为连续两次的登录失败,条件都是登录状态为"fail",时间间隔2s,然后就能在数据流中匹配出定义好的模式,最后检出符合匹配条件的流。检测的时候自定义一个PatternSelectFunction进行筛选,找出两次失败登录的事件信息,进行封装输出。
在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
在用户提交订单后的15分钟内检测用户是否完成了支付行为。
自定义数据集,自己构造一些数据,包括订单id,交易类型,交易码,和时间戳,目前就设置两种交易类型"create"和"pay"代表创建订单和支付。示例数据如下:
orderId,txType,timestamp
34729,create,,1558430842
34730,create,,1558430843
34729,pay,sd76f87d6,1558430844
34730,pay,3hu3k2432,1558430845
34731,create,,1558430846
利用 CEP 库来实现这个功能。先将事件流按照订单号 orderId 分流,然后定义这样的一个事件模式: 在 15 分钟内,事件“ create ”与 pay 非严格紧邻。将模式应用到数据流上后就可以得到pattern stream,然后调用select方法,就可以同时获取到匹配出的事件和未匹配的事件。
同样可以利用Process Function ,自定义实现检测订单超时的功能。为了简化问题,只考虑超时报警的情形,在 pay 事件超时未发生的情况下,输出超时报警信息。 一个简单的思路是,可以在订单的create 事件到来后注册定时器, 15 分钟后触发;然后再 用一个布尔类型的 Value 状态来作为标识位,表明 pay 事件是否发生过。如果 pay 事件已经发生,状态被置为 true ,那么就不再需要做什么操作;而如果 pay 事件一直没来,状态一直为 false ,到定时器触发时,就应该输出超时报警信息。
对于订单支付事件,用户支付完成其实并不算完,还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做合并处理。
为方便测试,又手动构造了些交易信息,包括支付交易码,支付渠道和时间戳,示例数据如下:
ewr342as4,wechat,1558430845
sd76f87d6,wechat,1558430847
3hu3k2432,alipay,1558430848
利用 connect 将两条流进行连接,然后用自定义的CoProcessFunction 进行处理。对两个流保存两个状态代表订单支付事件和交易事件。当且仅当两个状态都存在的时候才输出正常数据。否则有一方状态不存在时就设置一个计时器等待另一个事件,当定时器触发的时候就根据一方是否为空判断是否两者匹配上了。不匹配的输出到侧输出流即可。这样做还能避免再设置ValueState保存时间戳,因为只有一方等待另一方的时候才注册定时器,因此逻辑上前后是对应的。