event-driven
from cosmix/loom
A curated list of agents, skills and a CLAUDE.md starter for your agentic sessions!
6 stars0 forksUpdated Jan 26, 2026
npx skills add https://github.com/cosmix/loom --skill event-drivenSKILL.md
Event-Driven Architecture
Overview
Event-driven architecture (EDA) enables loosely coupled, scalable systems by communicating through events rather than direct calls. This skill covers message queues, pub/sub patterns, event sourcing, CQRS, distributed transaction management with sagas, and data streaming with Kafka.
Available Agents
- senior-software-engineer (Opus) - Architecture design, pattern selection, distributed system design
- software-engineer (Sonnet) - Event handler implementation, consumer/producer code
- security-engineer (Opus) - Event security, authorization patterns, message encryption
- senior-infrastructure-engineer (Opus) - Message broker setup, Kafka clusters, queue configuration
Key Concepts
Message Queues
RabbitMQ Implementation:
import amqp, { Channel, Connection } from "amqplib";
interface QueueConfig {
name: string;
durable: boolean;
deadLetterExchange?: string;
messageTtl?: number;
maxRetries?: number;
}
class RabbitMQClient {
private connection: Connection | null = null;
private channel: Channel | null = null;
async connect(url: string): Promise<void> {
this.connection = await amqp.connect(url);
this.channel = await this.connection.createChannel();
// Handle connection errors
this.connection.on("error", (err) => {
console.error("RabbitMQ connection error:", err);
this.reconnect(url);
});
}
async setupQueue(config: QueueConfig): Promise<void> {
if (!this.channel) throw new Error("Not connected");
const options: amqp.Options.AssertQueue = {
durable: config.durable,
arguments: {},
};
if (config.deadLetterExchange) {
options.arguments!["x-dead-letter-exchange"] = config.deadLetterExchange;
}
if (config.messageTtl) {
options.arguments!["x-message-ttl"] = config.messageTtl;
}
await this.channel.assertQueue(config.name, options);
}
async publish(
queue: string,
message: unknown,
options?: PublishOptions
): Promise<void> {
if (!this.channel) throw new Error("Not connected");
const content = Buffer.from(JSON.stringify(message));
const publishOptions: amqp.Options.Publish = {
persistent: true,
messageId: options?.messageId || crypto.randomUUID(),
timestamp: Date.now(),
headers: options?.headers,
};
this.channel.sendToQueue(queue, content, publishOptions);
}
async consume<T>(
queue: string,
handler: (
message: T,
ack: () => void,
nack: (requeue?: boolean) => void
) => Promise<void>,
options?: ConsumeOptions
): Promise<void> {
if (!this.channel) throw new Error("Not connected");
await this.channel.prefetch(options?.prefetch || 10);
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content: T = JSON.parse(msg.content.toString());
const retryCount =
(msg.properties.headers?.["x-retry-count"] as number) || 0;
await handler(
content,
() => this.channel!.ack(msg),
(requeue = false) => {
if (requeue && retryCount < (options?.maxRetries || 3)) {
// Requeue with incremented retry count
this.channel!.nack(msg, false, false);
this.publish(queue, content, {
headers: { "x-retry-count": retryCount + 1 },
});
} else {
this.channel!.nack(msg, false, false); // Send to DLQ
}
}
);
} catch (error) {
console.error("Message processing error:", error);
this.channel!.nack(msg, false, false);
}
});
}
}
AWS SQS Implementation:
import {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
} from "@aws-sdk/client-sqs";
interface SQSMessage<T> {
id: string;
body: T;
receiptHandle: string;
approximateReceiveCount: number;
}
class SQSQueue<T> {
private client: SQSClient;
private queueUrl: string;
constructor(queueUrl: string, region: string = "us-east-1") {
this.client = new SQSClient({ region });
this.queueUrl = queueUrl;
}
async send(
message: T,
options?: { delaySeconds?: number; deduplicationId?: string }
): Promise<string> {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: JSON.stringify(message),
DelaySeconds: options?.delaySeconds,
MessageDeduplicationId: options?.deduplicationId,
MessageGroupId: options?.deduplicationId ? "default" : undefined,
});
const response = await this.client.send(command);
return response.MessageId!;
}
async receive(
maxMessages: number = 10,
waitTimeSeconds: number = 20
): Promise<SQSMessage<T>[]> {
const command = new ReceiveMessageCommand({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: maxMessages,
WaitTimeSeconds:
...
Repository
cosmix/loomParent repository
Repository Stats
Stars6
Forks0