A lib for rabbit and sqs queues
Requires amqplib
to be installed separately. If you only need RabbitMQ support you can avoid also needing to install @aws-sdk
packages by importing from "@loke/queue-kit/rabbit"
.
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
import amqp from "amqplib"; // must be installed separately
async function main() {
const amqpConnection = await amqp.connect("amqp://localhost");
const rabbitHelper = new RabbitHelper({
amqpConnection,
logger: console,
});
await rabbitHelper.assertExchange();
await rabbitHelper.assertWorkQueue("work-queue", { retryDelay: 1000 });
await rabbitHelper.bindQueue("work-queue", "thing.*");
const aborter = new AbortController();
const doneP = (async () => {
// We loop here in case the rabbit channel closes and we need to restart
// the queue handler.
while (!aborter.signal.aborted) {
try {
await rabbitHelper.handleQueue({
queueName: "work-queue",
handler: async (msg) => {
await doWork(msg.body);
},
signal: aborter.signal,
});
} catch (err) {
logger.error("Error with spend tracker queue", err);
}
}
})();
await stopSignal();
aborter.abort();
await doneP;
}
main();
Breaking change in 2.x: assertWorkQueue
now requires a retryDelay
option. This is the delay between retries when a message fails to be processed.
To achieve this a dead letter queue is created and attached to the work queue
(via the default direct exchange). Because old queues can't be changed via
assertQueue, a new one will need to be created.
await rabbitHelper.assertWorkQueue("new-queue", { retryDelay: 1000 });
await rabbitHelper.bindQueue("new-queue", "thing.*");
// bindings will already exist for the old-queue
const aborter = new AbortController();
const handler = async (msg) => {
await doWork(msg.body);
};
const doneP = await Promise.all([
rabbitHelper.handleQueue({
queueName: "old-queue",
handler,
signal: aborter.signal,
}),
rabbitHelper.handleQueue({
queueName: "new-queue",
handler,
signal: aborter.signal,
}),
]);
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
import amqp from "amqplib"; // must be installed separately
async function main() {
const amqpConnection = await amqp.connect("amqp://localhost");
const rabbitHelper = new RabbitHelper({
amqpConnection,
logger: console,
});
await rabbitHelper.assertExchange();
const aborter = new AbortController();
const doneP = (async () => {
// We loop here in case the rabbit channel closes and we need to restart
// the queue handler.
//
// Because this is a subscription queue, we need to create a new queue each
// time we start the handler. This is because the queue will be deleted when
// the channel closes. If the resource the queue is updating could have
// become stale while the channel was closed, we need to handle that too.
//
// In this example we update a cache with the latest value from the queue.
// If we loose our our connection to the queue, we need to clear the cache
// and start again.
while (!aborter.signal.aborted) {
const { queue } = await rabbitHelper.createSubscriptionQueue();
await rabbitHelper.bindQueue(queue, "thing.*");
await clearCache();
try {
await rabbitHelper.handleQueue({
queueName: queue,
handler: async (msg) => {
await updateCache(msg.body);
},
signal: aborter.signal,
});
} catch (err) {
logger.error("Error with spend tracker queue", err);
}
}
})();
await stopSignal();
aborter.abort();
await doneP;
}
main();
import { RabbitHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit"
import amqp from "amqplib"; // must be installed separately
async function main() {
const amqpConnection = await amqp.connect("amqp://localhost");
const rabbitHelper = new RabbitHelper({
amqpConnection,
logger: console,
});
await rabbitHelper.publish("thing.1", {
foo: "bar",
});
}
Requires @aws-sdk/client-sqs
to be installed separately. If you only need SQS support you can avoid also needing to install amqplib
by importing from "@loke/queue-kit/sqs"
.
Handling a queue:
import { SQSHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/sqs"
import { SQSClient } from "@aws-sdk/client-sqs"; // must be installed separately
async function main() {
const sqsHelper = new SQSHelper({
sqs: new SQSClient(),
logger: console,
});
const aborter = new AbortController();
const doneP = await sqsHelper.handleQueue({
queueUrl: "https://queue-url",
handler: async (msg) => {
await doWork(msg.body);
},
signal: aborter.signal,
});
await stopSignal();
aborter.abort();
await doneP;
}
Queueing work:
import { SQSHelper } from "@loke/queue-kit"; // or "@loke/queue-kit/sqs"
import { SQSClient } from "@aws-sdk/client-sqs"; // must be installed separately
async function main() {
const sqsHelper = new SQSHelper({
sqs: new SQSClient(),
logger: console,
});
await sqsHelper.sendToQueue("https://queue-url", {
foo: "bar",
});
}
import { register } from "prom-client";
import { registerMetrics } from "@loke/queue-kit"; // or "@loke/queue-kit/rabbit" or "@loke/queue-kit/sqs"
registerMetrics(register);