交易次数
在MongoDB中,对单个文档的操作是原子的。因为您可以使用嵌入式文档和数组来捕获单个文档结构中的数据之间的关系,而不是在多个文档和集合之间进行规范化,所以这种单文档原子性消除了许多实际用例中对多文档事务的需求。
对于需要原子性地读写多个文档(在单个或多个集合中)的情况,MongoDB支持多文档事务。使用分布式事务,可以跨多个操作,集合,数据库,文档和分片使用事务。
以下示例重点介绍了交易API的关键组件:
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections. CRUD operations in transactions must be on existing collections.
client.get_database(
"mydb1", write_concern=wc_majority).foo.insert_one({'abc': 0})
client.get_database(
"mydb2", write_concern=wc_majority).bar.insert_one({'xyz': 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session=session)
collection_two.insert_one({'xyz': 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
callback, read_concern=ReadConcern('local'),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY)
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances; e.g.
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Prereq: Create collections. CRUD operations in transactions must be on existing collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations.
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections. CRUD operations in transactions must be on existing collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { w: 'majority' });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { w: 'majority' });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
]
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
]
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client) {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
$transactionOptions = [
'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::LOCAL),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
'readPreference' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::RP_PRIMARY),
];
\MongoDB\with_transaction($session, $callback, $transactionOptions);
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API合并了针对“ TransientTransactionError”或 “ UnknownTransactionCommitResult”提交错误的重试逻辑 。
重要
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections. CRUD operations in transactions must be on existing collections.
await client.get_database(
"mydb1", write_concern=wc_majority).foo.insert_one({'abc': 0})
await client.get_database(
"mydb2", write_concern=wc_majority).bar.insert_one({'xyz': 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({'abc': 1}, session=my_session)
await collection_two.insert_one({'xyz': 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback, read_concern=ReadConcern('local'),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY)
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
static bool
with_transaction_example (bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_read_concern_t *rc = NULL;
mongoc_read_prefs_t *rp = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client ();
/* Prereq: Create collections. */
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 1000);
insert_opts = bson_new ();
mongoc_write_concern_append (wc, insert_opts);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (0));
ret = mongoc_collection_insert_one (
coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (0));
ret = mongoc_collection_insert_one (
coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session (client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new ();
rp = mongoc_read_prefs_new (MONGOC_READ_PRIMARY);
rc = mongoc_read_concern_new ();
mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_LOCAL);
mongoc_transaction_opts_set_read_prefs (txn_opts, rp);
mongoc_transaction_opts_set_read_concern (txn_opts, rc);
mongoc_transaction_opts_set_write_concern (txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction (
session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy (doc);
mongoc_collection_destroy (coll);
bson_destroy (insert_opts);
mongoc_read_concern_destroy (rc);
mongoc_read_prefs_destroy (rp);
mongoc_write_concern_destroy (wc);
mongoc_transaction_opts_destroy (txn_opts);
mongoc_client_session_destroy (session);
mongoc_client_destroy (client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback (mongoc_client_session_t *session,
void *ctx,
bson_t **reply,
bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bool success = false;
bool ret = false;
client = mongoc_client_session_get_client (session);
coll = mongoc_client_get_collection (client, "mydb1", "foo");
doc = BCON_NEW ("abc", BCON_INT32 (1));
ret =
mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
bson_destroy (doc);
mongoc_collection_destroy (coll);
coll = mongoc_client_get_collection (client, "mydb2", "bar");
doc = BCON_NEW ("xyz", BCON_INT32 (999));
ret =
mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy (coll);
bson_destroy (doc);
return success;
}
该示例使用新的回调API来处理事务,该API启动事务,执行指定的操作并提交(或因错误而中止)。新的回调API还结合了重试逻辑
TransientTransactionError
或
UnknownTransactionCommitResult
提交错误。
重要
// mongocxx :: instance构造函数和析构函数
分别
初始化和关闭驱动程序。因此,必须在使用驱动程序之前创建mongocxx :: instance,并且//
在使用该驱动程序之前必须保持活动状态。mongocxx :: instance inst {};
//对于副本集,在URI
字符串中
包含副本集名称和成员的种子列表;例如// uriString =
//'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017 /?replicaSet = myRepl'
//对于分片群集,请连接到mongos实例;例如
// uriString ='mongodb://mongos0.example.com:27017,mongos1.example.com:27017 /'
mongocxx :: 客户 端客户端{ mongocxx :: uri { “ mongodb:// localhost /?replicaSet = replset” }};
write_concern wc_majority {};
wc_majority 。acknowledge_level (write_concern :: 水平:: k_majority );
read_concern rc_local {};
rc_local 。acknowledge_level (read_concern :: 水平:: k_local );
read_preference rp_primary {};
rp_primary 。模式(read_preference :: read_mode :: k_primary );
//先决条件:创建集合。
auto foo = client [ “ mydb1” ] [ “ foo” ];
自动 栏 = 客户端[ “ mydb2” ] [ “栏” ];
尝试 {
options :: insert opts ;
选择。write_concern (wc_majority );
富。insert_one (make_document (kvp (“ abc” , 0 )), opts );
吧。insert_one (make_document (kvp (“ xyz” , 0 )), opts );
} catch (const mongocxx :: exception & e ) {
std :: cout << “插入时发生异常:” << e 。什么() << std :: endl ;
返回 EXIT_FAILURE ;
}
//步骤1:定义回调,该回调指定
//事务
内部要执行的操作的顺序。client_session :: with_transaction_cb callback = [ &](client_session * session ) {
//重要提示:必须将会话传递给操作。
富。insert_one (* session , make_document (kvp (“ abc” , 1 )));
吧。insert_one (* session , make_document (kvp (<