async-programming

from martinholovsky/claude-skills-generator

No description

20 stars2 forksUpdated Dec 6, 2025
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill async-programming

SKILL.md

Async Programming Skill

File Organization

  • SKILL.md: Core principles, patterns, essential security (this file)
  • references/security-examples.md: Race condition and resource safety examples
  • references/advanced-patterns.md: Advanced async patterns and optimization

Validation Gates

Gate 0.1: Domain Expertise Validation

  • Status: PASSED
  • Expertise Areas: asyncio, Tokio, race conditions, resource management, concurrent safety

Gate 0.2: Vulnerability Research

  • Status: PASSED (3+ issues for MEDIUM-RISK)
  • Research Date: 2025-11-20
  • Issues: CVE-2024-12254 (asyncio memory), Redis race condition (CVE-2023-28858/9)

Gate 0.11: File Organization Decision

  • Decision: Split structure (MEDIUM-RISK, ~400 lines main + references)

1. Overview

Risk Level: MEDIUM

Justification: Async programming introduces race conditions, resource leaks, and timing-based vulnerabilities. While not directly exposed to external attacks, improper async code can cause data corruption, deadlocks, and security-sensitive race conditions like double-spending or TOCTOU (time-of-check-time-of-use).

You are an expert in asynchronous programming patterns for Python (asyncio) and Rust (Tokio). You write concurrent code that is free from race conditions, properly manages resources, and handles errors gracefully.

Core Expertise Areas

  • Race condition identification and prevention
  • Async resource management (connections, locks, files)
  • Error handling in concurrent contexts
  • Performance optimization for async workloads
  • Graceful shutdown and cancellation

2. Core Principles

  1. TDD First: Write async tests before implementation using pytest-asyncio
  2. Performance Aware: Use asyncio.gather, semaphores, and avoid blocking calls
  3. Identify Race Conditions: Recognize shared state accessed across await points
  4. Protect Shared State: Use locks, atomic operations, or message passing
  5. Manage Resources: Ensure cleanup happens even on cancellation
  6. Handle Errors: Don't let one task's failure corrupt others
  7. Avoid Deadlocks: Consistent lock ordering, timeouts on locks

Decision Framework

SituationApproach
Shared mutable stateUse asyncio.Lock or RwLock
Database transactionUse atomic operations, SELECT FOR UPDATE
Resource cleanupUse async context managers
Task coordinationUse asyncio.Event, Queue, or Semaphore
Background tasksTrack tasks, handle cancellation

3. Implementation Workflow (TDD)

Step 1: Write Failing Test First

import pytest
import asyncio

@pytest.mark.asyncio
async def test_concurrent_counter_safety():
    """Test counter maintains consistency under concurrent access."""
    counter = SafeCounter()  # Not implemented yet - will fail

    async def increment_many():
        for _ in range(100):
            await counter.increment()

    # Run 10 concurrent incrementers
    await asyncio.gather(*[increment_many() for _ in range(10)])

    # Must be exactly 1000 (no lost updates)
    assert await counter.get() == 1000

@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
    """Test resources are cleaned up even when task is cancelled."""
    cleanup_called = False

    async def task_with_resource():
        nonlocal cleanup_called
        async with managed_resource() as resource:  # Not implemented yet
            await asyncio.sleep(10)  # Long operation
        cleanup_called = True

    task = asyncio.create_task(task_with_resource())
    await asyncio.sleep(0.1)
    task.cancel()

    with pytest.raises(asyncio.CancelledError):
        await task

    assert cleanup_called  # Cleanup must happen

Step 2: Implement Minimum to Pass

import asyncio
from contextlib import asynccontextmanager

class SafeCounter:
    def __init__(self):
        self._value = 0
        self._lock = asyncio.Lock()

    async def increment(self) -> int:
        async with self._lock:
            self._value += 1
            return self._value

    async def get(self) -> int:
        async with self._lock:
            return self._value

@asynccontextmanager
async def managed_resource():
    resource = await acquire_resource()
    try:
        yield resource
    finally:
        await release_resource(resource)  # Always runs

Step 3: Refactor Following Patterns

Apply performance patterns, add timeouts, improve error handling.

Step 4: Run Full Verification

# Run async tests
pytest tests/ -v --asyncio-mode=auto

# Check for blocking calls
python -m asyncio debug

# Run with concurrency stress test
pytest tests/ -v -n auto --asyncio-mode=auto

4. Performance Patterns

Pattern 1: asyncio.gather for Concurrency

# BAD - Sequential execution
async def fetch_all_sequential(urls: list[str]) -> list[str]:
    results = []
    for url in urls:
        result = await fetch

...
Read full content

Repository Stats

Stars20
Forks2
LicenseThe Unlicense