In previous overview topic, we introduce an event driven architecture which combines outbox transactional pattern and saga orchestration. Let us implement outbox transaction pattern in MySQL 8.
Outbox Event Table Schema
| outbox_events | CREATE TABLE `outbox_events` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`type` tinyint unsigned NOT NULL,
`message` mediumtext NOT NULL,
`isConsumed` tinyint(1) NOT NULL DEFAULT '0',
`createdAt` datetime NOT NULL,
`updatedAt` datetime NOT NULL,
PRIMARY KEY (`id`),
KEY `outbox_events_is_consumed_created_at` (`isConsumed`,`createdAt`),
KEY `createdAt` (`createdAt`)
) ENGINE=InnoDB AUTO_INCREMENT=916 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
- id is designed by bigint unsighed.
- type is present by {resource}.outbox.command.{verb}, use tinyint unsighed to optimized query and storage.
- message is present event payload, which is common use JSON as data structure.
- isConsumed is present the event is consumed or not.
Define Outbox Event Dispatcher
class OutboxEventDispatcher {
static dispatch(outboxEvent, { transaction } = {}) {
return OutboxEventRepository.createOutboxEvent(outboxEvent, { transaction });
}
}
module.exports = OutboxEventDispatcher;
Define Outbox Event
class OrderCreatedOutboxEvent {
/*
*@param message {object} order
*@param message.id {integer} order
**/
constructor(message) {
super();
// means 'ORDER.OUTBOX.EVENT.CREATED'
this.type = 1;
if (message !== undefined) {
// you can implement your serialization, like use fast-json-stringify to replace JSON.stringify
this.message = JSON.stringify(message);
}
}
}
module.exports = OrderCreatedOutboxEvent;
Dispatch Outbox Event
const {
dispatch,
} = require('outbox-event-dispatcher');
const OrderCreatedOutboxEvent = require('order-created-outbox-event');
async function createOrder(creatingOrder) {
const txn = new Transaction();
return await txn.commit(transaction => {
const order = await OrderRepository.createOrder(creatingOrder, { transaction });
// INSERT INTO outbox_events (`id`,`type`,`message`,`isConsumed`,`createdAt`,`updatedAt`) VALUES (DEFAULT,?,?,?,?,?);
await dispatch(
new OrderCreatedOutboxEvent(order),
{ transaction }
);
return order;
}
}
As so far, we have implemented an outbox event insertion in api layer. We will introduce orchestrator implementation.
Optimize Polling Outbox Event
A simple query to implement FIFO behavior.
SELECT
`id`,
`type`,
`message`,
`isConsumed`,
`createdAt`,
`updatedAt`
FROM
`outbox_events` AS `outbox_events` USE INDEX (OUTBOX_EVENTS_IS_CONSUMED_CREATED_AT)
WHERE
`outbox_events`.`isConsumed` = FALSE
ORDER BY `outbox_events`.`createdAt` ASC , `outbox_events`.`id` ASC
LIMIT 1;
Use simple query, we are about to face the challenge of scalability of orchestrator. The query not perform any lock to row, which means replica of orchestrator can only 1. Because of lack of scalability, it may inefficiently consume events in high traffic environment.
In MySQL8, the new feature of SKIP LOCKED can be resolved this challenge. We can refactor simple SQL as
START TRANSACTION;
SELECT
`id`,
`type`,
`message`,
`isConsumed`,
`createdAt`,
`updatedAt`
FROM
`outbox_events` AS `outbox_events` USE INDEX (OUTBOX_EVENTS_IS_CONSUMED_CREATED_AT)
WHERE
`outbox_events`.`isConsumed` = FALSE
ORDER BY `outbox_events`.`createdAt` ASC , `outbox_events`.`id` ASC
LIMIT 1 FOR UPDATE SKIP LOCKED;
Use SKIP LOCKED, we can scale orchestrator as multiple replicas.
In addition, we are about to face another challenge. When query empty, the gap lock will be performed because of repeatable read (RR) isolation level in MySQL. A insert intention lock performed by insert outbox event is required to wait until gap lock release.
Therefore, the read committed (RC) isolation level is used to prevent perform gap lock if query empty. Let us refactor SQL again
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
START TRANSACTION;
SELECT
`id`,
`type`,
`message`,
`isConsumed`,
`createdAt`,
`updatedAt`
FROM
`outbox_events` AS `outbox_events` USE INDEX (OUTBOX_EVENTS_IS_CONSUMED_CREATED_AT)
WHERE
`outbox_events`.`isConsumed` = FALSE
ORDER BY `outbox_events`.`createdAt` ASC , `outbox_events`.`id` ASC
LIMIT 1 FOR UPDATE SKIP LOCKED;
As so far so good, in high traffic environment, we can increment LIMIT to reduce SQL round time trips.
Orchestrator Implementation
const { EventEmitter } = require('events');
class Orchestrator extends EventEmitter {
constructor() {
super();
this.on('outbox', async function () {
try {
// SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
const txn = new Transaction({
isolationLevel: ISOLATION_LEVELS.READ_COMMITTED,
});
// START TRANSACTION;
const outboxEvents = await txn.commit(transaction => {
// SELECT * FROM outbox_events WHERE isConsumed = FALSE
// ORDER BY `outbox_events`.`createdAt` ASC , `outbox_events`.`id` ASC
// LIMIT 100 FOR UPDATE SKIP LOCKED;
const outboxEvents = await OutboxEventRepository.findOutboxEvents({
limit: 100,
transaction,
});
const messages = outboxEvents.map(e => e.message);
// Abstract Queue interface, you can use kafka/rebbitMQ/redis as Queue.
await Queue.add(messages);
// UPDATE outbox_events SET isConsumed`=TRUE, updatedAt='2024-12-07 11:48:05'
// WHERE `id` IN (...) AND `isConsumed` = TRUE;
await OutboxEventRepository.setOutboxEventsConsumed(outboxEvents, { transaction });
return outboxEvents;
};
this.emit('outbox:ack', null, outboxEvents);
} catch (error) {
this.emit('outbox:ack', error);
}
});
this.on('outbox:ack', function (error, outboxEvents) {
if (error !== null) {
setTimeout(() => this.emit('outbox'), 1000);
} else if (outboxEvents.length) {
this.emit('outbox');
} else {
setTimeout(() => this.emit('outbox'), 1000);
}
});
this.emit('outbox');
}
}
module.exports = Orchestrator;
Summary
Implement outbox transactional pattern in MySQL8, for scalability and performant delivery event. we need
- Use SKIP LOCKED: multiple orchestrator replicas are able to parallel consume outbox events.
- Use RC isolation level: prevent gap lock performed by query empty. (insert outbox event perform insert intention lock which need to wait gap lock released)
- Increment Limit: reduce query round time trips.