聚合管道作为替代
聚合流水线 比map-reduce提供更好的性能和更一致的接口。
为了执行map-reduce操作,MongoDB提供了
mapReduce命令,并在mongo外壳中提供了db.collection.mapReduce()wrapper方法。
如果map-reduce数据集不断增长,则可能需要执行增量map-reduce而不是每次都对整个数据集执行map-reduce操作。
要执行增量映射减少:
query参数指定仅与新文档匹配的条件
。out参数指定reduce将新结果合并到现有输出集合中的操作。请考虑以下示例,在该示例中,您usersessions对每天要在集合上运行的map-reduce操作进行调度。
该usersessions集合包含每天记录用户会话的文档,例如:
运行第一个map-reduce操作,如下所示:
定义映射的映射函数userid到包含字段的对象total_time,count和avg_time:
用两个参数定义相应的reduce函数,
key并values计算总时间和计数。将key对应于userid和values是数组,其元素对应于映射到各个对象
userid中mapFunction。
有两个参数定义finalize函数key和
reducedValue。该函数修改reducedValue文档以添加另一个字段average并返回修改后的文档。
在执行映射简化usersessions使用收集
mapFunction的reduceFunction,和
finalizeFunction功能。将结果输出到集合
session_stats。如果session_stats集合已经存在,则该操作将替换内容:
查询session_stats集合以验证结果:
该操作返回以下文档:
以后,随着usersessions集合的增长,您可以运行其他map-reduce操作。例如,将新文档添加到
usersessions集合中:
最终,对usersessions集合执行增量map-reduce
,但是使用该query字段仅选择新文档。将结果输出到collection session_stats,但是reduce将内容与增量map-reduce的结果进行比较:
查询session_stats集合以验证结果:
该操作返回以下文档:
前提条件:将集合设置为原始状态:
使用可用的聚合管道运算符,您可以重写map-reduce示例,而无需定义自定义函数:
和$group分组userid,得出:
total_time使用$sum操作count使用$sum操作avg_time使用$avg操作该操作返回以下文档:
该$project阶段调整输出文档的形状以反映map-reduce的输出,该输出具有两个字段_id和
value。如果不需要镜像_idand value结构,则该阶段是可选的
。
该$merge阶段将结果输出到
session_stats_agg集合。如果现有文档_id与新结果相同,则该操作将应用指定的管道,以根据结果和现有文档计算total_time,count和avg_time。如果是相同的,现有的文档_id中session_stats_agg,操作插入文档。
查询session_stats_agg集合以验证结果:
该操作返回以下文档:
将新文档添加到usersessions集合中:
$match在管道的开头添加一个阶段以指定日期过滤器:
查询session_stats_agg集合以验证结果:
该操作返回以下文档:
可选的。为了避免$match每次运行时都必须修改聚合管道的
日期条件,可以在帮助函数中定义包装聚合:
然后,要运行,您只需将开始日期传递给该updateSessionStats()函数: