MongoDB工具 >MongoDB Spark连接器 >Spark Connector R指南 > 聚合
将数据从MongoDB读入Spark时,请使用MongoDB的聚合管道来应用过滤规则并执行聚合操作。
考虑一个名为的集合fruit,其中包含以下文档:
fruit
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
在shell中从中添加一个pipeline参数,以指定创建DataFrame时要使用的聚合管道。read.df()sparkR
pipeline
read.df()
sparkR
agg_pipeline <- "{'$match': {'type': 'apple'}}" df <- read.df("", source = "com.mongodb.spark.sql.DefaultSource", pipeline = agg_pipeline) head(df)
注意
空参数(“”)是指用作数据源的文件。在这种情况下,我们的数据源是MongoDB集合,因此数据源参数为空。
在sparkR外壳程序中,该操作将输出以下输出:
_id qty type 1 1 5 apple