MongoDB工具 >MongoDB Spark连接器 >Spark Connector Python指南 > 聚合
将数据从MongoDB读入Spark时,请使用MongoDB的聚合管道来应用过滤规则并执行聚合操作。
考虑一个名为的集合fruit,其中包含以下文档:
fruit
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
在shell中从中添加option()方法,以指定创建DataFrame时要使用的聚合管道。spark.read()pyspark
option()
spark.read()
pyspark
pipeline = "{'$match': {'type': 'apple'}}" df = spark.read.format("mongo").option("pipeline", pipeline).load() df.show()
在pyspark外壳程序中,该操作将输出以下输出:
+---+---+-----+ |_id|qty| type| +---+---+-----+ |1.0|5.0|apple| +---+---+-----+