聚合管道作为替代
聚合流水线 比map-reduce提供更好的性能和更一致的接口。
为了执行map-reduce操作,MongoDB提供了
mapReduce
命令,并在mongo
外壳中提供了db.collection.mapReduce()
wrapper方法。
如果map-reduce数据集不断增长,则可能需要执行增量map-reduce而不是每次都对整个数据集执行map-reduce操作。
要执行增量映射减少:
query
参数指定仅与新文档匹配的条件
。out
参数指定reduce
将新结果合并到现有输出集合中的操作。请考虑以下示例,在该示例中,您usersessions
对每天要在集合上运行的map-reduce操作进行调度。
该usersessions
集合包含每天记录用户会话的文档,例如:
运行第一个map-reduce操作,如下所示:
定义映射的映射函数userid
到包含字段的对象total_time
,count
和avg_time
:
用两个参数定义相应的reduce函数,
key
并values
计算总时间和计数。将key
对应于userid
和values
是数组,其元素对应于映射到各个对象
userid
中mapFunction
。
有两个参数定义finalize函数key
和
reducedValue
。该函数修改reducedValue
文档以添加另一个字段average
并返回修改后的文档。
在执行映射简化usersessions
使用收集
mapFunction
的reduceFunction
,和
finalizeFunction
功能。将结果输出到集合
session_stats
。如果session_stats
集合已经存在,则该操作将替换内容:
查询session_stats
集合以验证结果:
该操作返回以下文档:
以后,随着usersessions
集合的增长,您可以运行其他map-reduce操作。例如,将新文档添加到
usersessions
集合中:
最终,对usersessions
集合执行增量map-reduce
,但是使用该query
字段仅选择新文档。将结果输出到collection session_stats
,但是reduce
将内容与增量map-reduce的结果进行比较:
查询session_stats
集合以验证结果:
该操作返回以下文档:
前提条件:将集合设置为原始状态:
使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:
和$group
分组userid
,得出:
total_time
使用$sum
操作count
使用$sum
操作avg_time
使用$avg
操作该操作返回以下文档:
该$project
阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id
和
value
。如果不需要镜像_id
and value
结构,则该阶段是可选的
。
该$merge
阶段将结果输出到
session_stats_agg
集合。如果现有文档_id
与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id
中session_stats_agg
,操作插入文档。
查询session_stats_agg
集合以验证结果:
该操作返回以下文档:
将新文档添加到usersessions
集合中:
$match
在管道的开头添加一个阶段以指定日期过滤器:
查询session_stats_agg
集合以验证结果:
该操作返回以下文档:
可选的。为了避免$match
每次运行时都必须修改聚合管道的
日期条件,可以在帮助函数中定义包装聚合:
然后,要运行,您只需将开始日期传递给该updateSessionStats()
函数: