event-driven

from cosmix/loom

A curated list of agents, skills and a CLAUDE.md starter for your agentic sessions!

6 stars0 forksUpdated Jan 26, 2026
npx skills add https://github.com/cosmix/loom --skill event-driven

SKILL.md

Event-Driven Architecture

Overview

Event-driven architecture (EDA) enables loosely coupled, scalable systems by communicating through events rather than direct calls. This skill covers message queues, pub/sub patterns, event sourcing, CQRS, distributed transaction management with sagas, and data streaming with Kafka.

Available Agents

  • senior-software-engineer (Opus) - Architecture design, pattern selection, distributed system design
  • software-engineer (Sonnet) - Event handler implementation, consumer/producer code
  • security-engineer (Opus) - Event security, authorization patterns, message encryption
  • senior-infrastructure-engineer (Opus) - Message broker setup, Kafka clusters, queue configuration

Key Concepts

Message Queues

RabbitMQ Implementation:

import amqp, { Channel, Connection } from "amqplib";

interface QueueConfig {
  name: string;
  durable: boolean;
  deadLetterExchange?: string;
  messageTtl?: number;
  maxRetries?: number;
}

class RabbitMQClient {
  private connection: Connection | null = null;
  private channel: Channel | null = null;

  async connect(url: string): Promise<void> {
    this.connection = await amqp.connect(url);
    this.channel = await this.connection.createChannel();

    // Handle connection errors
    this.connection.on("error", (err) => {
      console.error("RabbitMQ connection error:", err);
      this.reconnect(url);
    });
  }

  async setupQueue(config: QueueConfig): Promise<void> {
    if (!this.channel) throw new Error("Not connected");

    const options: amqp.Options.AssertQueue = {
      durable: config.durable,
      arguments: {},
    };

    if (config.deadLetterExchange) {
      options.arguments!["x-dead-letter-exchange"] = config.deadLetterExchange;
    }
    if (config.messageTtl) {
      options.arguments!["x-message-ttl"] = config.messageTtl;
    }

    await this.channel.assertQueue(config.name, options);
  }

  async publish(
    queue: string,
    message: unknown,
    options?: PublishOptions
  ): Promise<void> {
    if (!this.channel) throw new Error("Not connected");

    const content = Buffer.from(JSON.stringify(message));
    const publishOptions: amqp.Options.Publish = {
      persistent: true,
      messageId: options?.messageId || crypto.randomUUID(),
      timestamp: Date.now(),
      headers: options?.headers,
    };

    this.channel.sendToQueue(queue, content, publishOptions);
  }

  async consume<T>(
    queue: string,
    handler: (
      message: T,
      ack: () => void,
      nack: (requeue?: boolean) => void
    ) => Promise<void>,
    options?: ConsumeOptions
  ): Promise<void> {
    if (!this.channel) throw new Error("Not connected");

    await this.channel.prefetch(options?.prefetch || 10);

    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content: T = JSON.parse(msg.content.toString());
        const retryCount =
          (msg.properties.headers?.["x-retry-count"] as number) || 0;

        await handler(
          content,
          () => this.channel!.ack(msg),
          (requeue = false) => {
            if (requeue && retryCount < (options?.maxRetries || 3)) {
              // Requeue with incremented retry count
              this.channel!.nack(msg, false, false);
              this.publish(queue, content, {
                headers: { "x-retry-count": retryCount + 1 },
              });
            } else {
              this.channel!.nack(msg, false, false); // Send to DLQ
            }
          }
        );
      } catch (error) {
        console.error("Message processing error:", error);
        this.channel!.nack(msg, false, false);
      }
    });
  }
}

AWS SQS Implementation:

import {
  SQSClient,
  SendMessageCommand,
  ReceiveMessageCommand,
  DeleteMessageCommand,
} from "@aws-sdk/client-sqs";

interface SQSMessage<T> {
  id: string;
  body: T;
  receiptHandle: string;
  approximateReceiveCount: number;
}

class SQSQueue<T> {
  private client: SQSClient;
  private queueUrl: string;

  constructor(queueUrl: string, region: string = "us-east-1") {
    this.client = new SQSClient({ region });
    this.queueUrl = queueUrl;
  }

  async send(
    message: T,
    options?: { delaySeconds?: number; deduplicationId?: string }
  ): Promise<string> {
    const command = new SendMessageCommand({
      QueueUrl: this.queueUrl,
      MessageBody: JSON.stringify(message),
      DelaySeconds: options?.delaySeconds,
      MessageDeduplicationId: options?.deduplicationId,
      MessageGroupId: options?.deduplicationId ? "default" : undefined,
    });

    const response = await this.client.send(command);
    return response.MessageId!;
  }

  async receive(
    maxMessages: number = 10,
    waitTimeSeconds: number = 20
  ): Promise<SQSMessage<T>[]> {
    const command = new ReceiveMessageCommand({
      QueueUrl: this.queueUrl,
      MaxNumberOfMessages: maxMessages,
      WaitTimeSeconds:

...
Read full content

Repository Stats

Stars6
Forks0