MongoDB工具 >MongoDB Kafka连接器 >水槽连接器指南 > 水槽后处理器
在本页面
后处理器是接收器连接器类,在从Kafka主题中读取后SinkDocument
,该连接器类会修改中的数据
,该类包含SinkRecord
键和值字段的BSON表示
。该连接器应用了一系列后处理器,其中每个后处理器均以上提供的顺序执行SinkDocument
,并且结果存储在MongoDB集合中。
后处理器执行数据修改任务,例如设置文档_id
字段,键或值字段投影,重命名字段以及编辑敏感信息。您可以使用以下预构建的后处理器,也可以通过扩展PostProcessor
类来实现自己的后处理器:
后处理器名称 | 描述 |
---|---|
DocumentIdAdder | 完整路径:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder 使用已配置的策略插入
_id 字段。也可以看看 |
BlacklistKeyProjector | 完整路径:
com.mongodb.kafka.connect.sink.processor.BlacklistKeyProjector 从接收器记录中删除匹配的关键字段。
|
黑名单价值投影仪 | 完整路径:
com.mongodb.kafka.connect.sink.processor.BlacklistValueProjector 从接收器记录中删除匹配的值字段。
|
白名单关键投影仪 | 完整路径:
com.mongodb.kafka.connect.sink.processor.WhitelistKeyProjector 仅包含接收器记录中匹配的关键字段。
|
白名单ValueProjector | 完整路径:
com.mongodb.kafka.connect.sink.processor.WhitelistValueProjector 匹配接收器记录中的值字段。
|
KafkaMetaAdder | 完整路径:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder 向文档添加由Kafka主题,分区和偏移量的串联组成的字段。
|
重命名 | 完整路径:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping 重命名与指定键或值字段完全匹配的字段。
|
重命名正则表达式 | 完整路径:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex 重命名与正则表达式匹配的字段。
|
您可以通过指定一个以逗号分隔的全限定PostProcessor
类名列表来配置后处理器链:
注意
如果DocumentIdAdder
后处理器不存在,则会自动添加到链中的第一个位置。
本节说明MongoDB Kafka Connector中包含的后处理器可用的配置选项。
在DocumentIdAdder
后处理器提供了_id
现场的SinkDocument
之前被写入MongoDB的集合。此后处理器使用一种策略进行配置,该策略包含用于生成的值的逻辑_id
。此连接器提供以下策略:
策略名称 | 描述 |
---|---|
BsonOid策略 | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy DocumentIdAdder 后处理器的默认值。生成一个MongoDB BSON ObjectId。
|
KafkaMetaDataStrategy | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy 构建一个由Kafka主题,分区和偏移量的串联组成的字符串。
|
FullKey策略 | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy 使用的完整密钥结构
SinkDocument 。如果没有密钥,则默认为空白文档。
|
ProvidedInKeyStrategy | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy 使用
_id 在键结构中指定的字段(SinkDocument 如果存在)。如果缺少该字段,则引发异常。
|
ProvidedInValueStrategy | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy 使用
_id 在的值结构中指定的字段(SinkDocument 如果存在)。如果缺少该字段,则引发异常。
|
部分密钥策略 | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy 使用的键结构的黑名单或白名单投影
SinkDocument 。如果没有密钥,则默认为空白文档。
|
部分价值策略 | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy 使用的值结构的黑名单或白名单投影
SinkDocument 。如果不存在任何值,则默认为空白文档。
|
Uuid策略 | 完整路径:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy 生成一个随机UUID作为字符串。
|
您可以document.id.strategy
如下分配属性:
要定义自定义策略,请创建一个实现IdStrategy接口的类,
并提供指向该document.id.strategy
设置的标准路径。
选择的策略可能会对传递语义产生影响
BSON ObjectId或UUID策略只能保证至少一次交付,因为新ID将在重试或重新处理时生成。如果保证文档_id的字段唯一,其他策略则允许一次发送。
本节提供了示例投影配置,以显示它们如何过滤以下样本记录:
注意
以下示例配置包含[key|value]
占位符值,它们代表key
或value
以避免重复。在创建配置时,指定一种适合您的用例的方法。
在以下示例接收器配置中,我们指定黑名单投影以及要从记录中省略的特定字段:
注意
您可以使用“。” (点)表示法引用记录中的子文档。您也可以使用它来引用数组中文档的字段。
应用投影后,记录包含以下数据:
在以下示例接收器配置中,我们指定白名单投影以及要包括在记录中的特定字段:
注意
您可以使用“。” 记录中引用子文档的符号。您也可以使用它来引用数组中文档的字段。
应用投影后,记录包含以下数据:
先前的示例投影配置演示了字段名称上的精确字符串匹配。投影list
设置还支持以下与字段名称匹配的通配符模式:
*
”(星号):匹配指定文档中级别的任意长度的字符串。**
”(双星):匹配当前和指定嵌套级别的所有嵌套级别。下面的示例演示如何使用以下示例记录中的每个通配符模式和投影输出:
白名单通配符示例
*
下面的示例中的通配符模式匹配数组中命名的所有键
temp
,forecast
并且所有字段都嵌套在其下方的单个级别上。
应用投影后,记录包含以下数据:
下例中的**
通配符模式匹配包含该字段的所有级别的所有键scale
。
应用投影后,记录包含以下数据:
黑名单通配符示例
通配符也可以用于匹配特定级别的所有字段名称,如以下黑名单投影配置示例所示:
应用投影后,记录包含以下数据:
连接器配置还支持**
(双星号)通配符,该通配符与指定它的当前级别和所有嵌套级别相匹配。
本节提供了后处理器RenameByMapping
和RenameByRegex
后处理器的示例配置,以显示它们如何更新接收器记录中的字段名称。字段重命名参数指定是使用点表示法还是使用JSON数组中的匹配和替换字符串模式来更新记录中的
key
或value
文档。
重命名后处理器示例的字段使用以下样本接收器记录:
关键文件
有价文件
该RenameByMapping
后处理器设置为对象的数组。数组中的每个对象都包含以下JSON元素键:
每个对象在oldName
元素中包含要匹配的文本,在元素中包含替换文本newName
。
键名 | 描述 |
---|---|
旧名称 | 包含与要替换的文本匹配的字符串。 |
新名字 | 包含oldName 字段中定义的字符串的所有匹配项的替换文本。 |
应用RenameByMapping
后处理器后,记录包含以下数据:
关键文件
有价文件
该RenameByRegex
后处理器设置为对象的数组。数组中的每个对象都包含以下JSON元素键:
键名 | 描述 |
---|---|
正则表达式 | 包含匹配字段以执行替换的正则表达式。 |
图案 | 包含与要替换的文本匹配的正则表达式。 |
新名字 | 包含pattern 字段中定义的所有正则表达式匹配项的替换文本。 |
例
应用RenameByMapping
后处理器后,记录包含以下数据:
关键文件
有价文件
后处理器应用了以下更改:
crepes
将匹配。在匹配的字段中,所有“已购买”的实例都将替换为“数量”。确保重命名不会导致同一文档中的重复键
重命名后处理器会更新JSON文档的关键字字段,这可能导致文档中的关键字重复。如果替换密钥在当前级别已经存在,则他们跳过重命名步骤。
一个写模型定义了一个MongoDB的集合进行批量的写操作的行为。连接器的默认写模型为 ReplaceOneModel,其中 ReplaceOptions 设置为upsert模式。
您可以通过在mongodb.writemodel.strategy
配置设置中指定自定义模型来覆盖默认写入模型
。连接器提供了以下策略:
写模型 | 描述 |
---|---|
ReplaceOneDefaultStrategy | 通过该
_id 字段最多替换一个与当前文档匹配的文档。writemodel.strategy 配置设置的默认值。 |
ReplaceOneBusinessKeyStrategy | 最多替换一个与设置提供的过滤器匹配的文档
document.id.strategy 。设置以下配置:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy 也可以看看 WriteModel策略中的使用示例:业务密钥。 |
DeleteOneDefaultStrategy | document.id.strategy 仅当文档包含空值记录时,才删除最多一个与设置指定的ID相匹配的文档。设置配置设置时隐式指定
mongodb.delete.on.null.values=true 。您可以使用以下配置对此进行显式设置:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy |
UpdateOneTimestamps策略 | 在文档中添加
_insertedTS (插入时间戳)和_modifiedTS (修改后的时间戳)字段。设置以下配置:
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy 也可以看看 WriteModel策略中的使用示例:插入和修改的时间戳。 |
注意
MongoDB Kafka Connector的未来版本将允许开发人员在配置设置中指定用户创建的自定义策略。
业务密钥是由接收器记录中的数据组成的值,该值将其标识为唯一文档。此示例使用记录中多个字段中包含的数据定义业务密钥,并指示后处理器生成用于插入但不用于更新的BSON ObjectId。
要配置此策略,需要执行以下步骤:
PartialValueStrategy
将ID 指定为ID策略以标识属于业务密钥的字段。ReplaceOneBusinessKeyStrategy
writemodel策略。在此示例中,我们通过分别位于flight_no
和的
当前航班号和机场跟踪飞机的容量airport_code
。一个示例消息包含以下内容:
为了实施该策略,我们首先在MongoDB Shell中的flight_no
和airport_code
字段上创建一个唯一索引
:
接下来,我们指定PartialValueStrategy
要包含在业务密钥中的策略和字段,并ReplaceOneBusinessKeyStrategy
在配置文件中指定
writemodel策略:
插入到集合中的样本数据包含以下内容:
当连接器处理与具有相同业务关键字字段值的现有文档匹配的接收器数据时,它将使用新值更新文档而不更改_id
字段。
投影后期处理器与PartialValueStrategy不兼容
该PartialValueStrategy
ID策略使用
[key|value].projection.type
和[key|value].projection.list
设置来定义哪些字段用来形成_id
场。由于黑名单和白名单后处理器使用相同的投影设置,因此无法单独指定它们。
在连接器外部使用单消息转换(SMT)来格式化源数据(如果需要)。
本示例说明了如何跟踪由连接器插入的文档的创建和更新时间戳。该
UpdateOneTimestampsStrategy
自定义写入模式战略执行以下任务:
_insertedTS
and _modifiedTS
字段将设置为当前时间。_modifiedTS
字段将更新为当前时间。通过UpdateOneTimestampsStrategy
在配置文件中指定来进行设置,如下所示:
对于此示例,我们要跟踪火车沿其路线的位置。该_modifiedTS
字段为我们提供了第一个位置报告保存到集合中的时间和日期。
头寸报告在价值凭证中包含以下数据:
writemodel策略设置为UpdateOneTimestampsStrategy
附加创建和修改的时间戳,而document id策略设置为ProvidedInValueStrategy
使用有价文档的_id
字段来标识列车。
在处理火车的初始消息之后插入的MongoDB文档包含以下数据:
一个小时后,火车报告其沿路线的当前位置。该
position
和_modifiedTS
字段更新: