MongoDB工具 >MongoDB Spark连接器 >Spark Connector Scala指南 > 数据集和SQL
源代码
有关包含以下示例的源代码,请参见 SparkSQL.scala。
本教程可以作为独立的Scala应用程序运行,也可以作为Spark Shell中的单个命令运行。
将以下文档插入characters
集合:
在火花2.0新,一个DataFrame
由a表示Dataset
的
Rows
,现在的别名Dataset[Row]
。
蒙戈星火连接器提供了
com.mongodb.spark.sql.DefaultSource
创建类
DataFrames
和Datasets
从MongoDB的。使用连接器的
MongoSpark
助手来帮助创建DataFrame
:
该操作将打印以下内容:
注意
默认情况下,从MongoDB中读取内容SparkSession
是通过对数据库中的文档进行采样来推断架构的。要显式声明一个模式,请参阅显式声明一个模式。
另外,您可以使用SparkSession
方法创建DataFrames:
注意
filters
与DataFrames或Spark SQL一起使用时,基础的Mongo Connector代码构造一个聚合管道,以在将数据发送到Spark之前过滤MongoDB中的数据。
以下示例过滤并输出年龄小于100的字符:
该操作输出以下内容:
默认情况下,从MongoDB中读取内容SparkSession
是通过对集合中的文档进行采样来推断架构的。您还可以使用
来显式定义架构,从而消除采样所需的额外查询。case class
注意
如果为架构提供案例类,则MongoDB 仅返回声明的字段。这有助于最小化通过电线发送的数据。
以下语句创建一个,然后使用它为定义架构:Character
case class
DataFrame
重要
对于自包含的Scala应用程序,Character
应使用该类在方法之外定义该类。
该操作将输出以下输出:
转换时,可以使用的情况下类DataFrame
的
Dataset
,如下面的例子:
本MongoRDD
类提供帮助者的转换RDD
,以
DataFrames
和Datasets
。以下示例将SparkContext
对象传递
到,MongoSpark.load()
该对象返回
RDD
,然后对其进行转换:
在数据集上运行SQL查询之前,必须为数据集注册一个临时视图。
以下操作注册一个
characters
表,然后查询该表以查找所有100个或更旧的字符:
MongoDB Spark Connector提供了将DataFrames持久存储到MongoDB中的集合的功能。
以下示例使用MongoSpark.save(DataFrameWriter)
方法将方法保存centenarians
到hundredClub
MongoDB 的集合中并验证保存,从hundredClub
集合中读取:
如果集合已经存在,则DataFrameWriter包括在写入结果之前.mode("overwrite")
删除
hundredClub
集合的。
在Spark Shell中,该操作将输出以下输出:
MongoSpark.save(dataFrameWriter)
是通过DataFrameWriter进行配置和保存的简写。以下示例使用DataFrameWriter直接将DataFrames写入MongoDB:
Spark支持有限数量的数据类型,以确保所有BSON类型都可以往返于Spark DataFrame / Dataset中。对于任何不受支持的Bson类型,将创建自定义StructType。
下表显示了Bson类型和Spark类型之间的映射:
Bson类型 | 火花类型 |
---|---|
Document |
StructType |
Array |
ArrayType |
32-bit integer |
Integer |
64-bit integer |
Long |
Binary data |
Array[Byte] 或StructType :{ subType: Byte, data: Array[Byte]} |
Boolean |
Boolean |
Date |
java.sql.Timestamp |
DBPointer |
StructType : { ref: String , oid: String} |
Double |
Double |
JavaScript |
StructType : { code: String } |
JavaScript with scope |
StructType : { code: String , scope: String } |
Max key |
StructType : { maxKey: Integer } |
Min key |
StructType : { minKey: Integer } |
Null |
null |
ObjectId |
StructType : { oid: String } |
Regular Expression |
StructType : { regex: String , options: String } |
String |
String |
Symbol |
StructType : { symbol: String } |
Timestamp |
StructType : { time: Integer , inc: Integer } |
Undefined |
StructType : { undefined: Boolean } |
为了帮助更好地支持数据集,已创建以下Scala案例类(
com.mongodb.spark.sql.fieldTypes
)和JavaBean类(
com.mongodb.spark.sql.fieldTypes.api.java.
)来表示不受支持的BSON类型:
Bson类型 | Scala案例类 | JavaBean |
---|---|---|
Binary data |
Binary |
Binary |
DBPointer |
DBPointer |
DBPointer |
JavaScript |
JavaScript |
JavaScript |
JavaScript with scope |
JavaScriptWithScope |
JavaScriptWithScope |
Max key |
MaxKey |
MaxKey |
Min key |
MinKey |
MinKey |
ObjectId |
ObjectId |
ObjectId |
Regular Expression |
RegularExpression |
RegularExpression |
Symbol |
Symbol |
Symbol |
Timestamp |
Timestamp |
Timestamp |
Undefined |
Undefined |
Undefined |
为了方便起见,所有BSON类型也可以表示为String值。但是,这些值会丢失其所有原始类型信息,并且如果保存回MongoDB,则会存储为字符串。