async-programming
from martinholovsky/claude-skills-generator
No description
20 stars2 forksUpdated Dec 6, 2025
npx skills add https://github.com/martinholovsky/claude-skills-generator --skill async-programmingSKILL.md
Async Programming Skill
File Organization
- SKILL.md: Core principles, patterns, essential security (this file)
- references/security-examples.md: Race condition and resource safety examples
- references/advanced-patterns.md: Advanced async patterns and optimization
Validation Gates
Gate 0.1: Domain Expertise Validation
- Status: PASSED
- Expertise Areas: asyncio, Tokio, race conditions, resource management, concurrent safety
Gate 0.2: Vulnerability Research
- Status: PASSED (3+ issues for MEDIUM-RISK)
- Research Date: 2025-11-20
- Issues: CVE-2024-12254 (asyncio memory), Redis race condition (CVE-2023-28858/9)
Gate 0.11: File Organization Decision
- Decision: Split structure (MEDIUM-RISK, ~400 lines main + references)
1. Overview
Risk Level: MEDIUM
Justification: Async programming introduces race conditions, resource leaks, and timing-based vulnerabilities. While not directly exposed to external attacks, improper async code can cause data corruption, deadlocks, and security-sensitive race conditions like double-spending or TOCTOU (time-of-check-time-of-use).
You are an expert in asynchronous programming patterns for Python (asyncio) and Rust (Tokio). You write concurrent code that is free from race conditions, properly manages resources, and handles errors gracefully.
Core Expertise Areas
- Race condition identification and prevention
- Async resource management (connections, locks, files)
- Error handling in concurrent contexts
- Performance optimization for async workloads
- Graceful shutdown and cancellation
2. Core Principles
- TDD First: Write async tests before implementation using pytest-asyncio
- Performance Aware: Use asyncio.gather, semaphores, and avoid blocking calls
- Identify Race Conditions: Recognize shared state accessed across await points
- Protect Shared State: Use locks, atomic operations, or message passing
- Manage Resources: Ensure cleanup happens even on cancellation
- Handle Errors: Don't let one task's failure corrupt others
- Avoid Deadlocks: Consistent lock ordering, timeouts on locks
Decision Framework
| Situation | Approach |
|---|---|
| Shared mutable state | Use asyncio.Lock or RwLock |
| Database transaction | Use atomic operations, SELECT FOR UPDATE |
| Resource cleanup | Use async context managers |
| Task coordination | Use asyncio.Event, Queue, or Semaphore |
| Background tasks | Track tasks, handle cancellation |
3. Implementation Workflow (TDD)
Step 1: Write Failing Test First
import pytest
import asyncio
@pytest.mark.asyncio
async def test_concurrent_counter_safety():
"""Test counter maintains consistency under concurrent access."""
counter = SafeCounter() # Not implemented yet - will fail
async def increment_many():
for _ in range(100):
await counter.increment()
# Run 10 concurrent incrementers
await asyncio.gather(*[increment_many() for _ in range(10)])
# Must be exactly 1000 (no lost updates)
assert await counter.get() == 1000
@pytest.mark.asyncio
async def test_resource_cleanup_on_cancellation():
"""Test resources are cleaned up even when task is cancelled."""
cleanup_called = False
async def task_with_resource():
nonlocal cleanup_called
async with managed_resource() as resource: # Not implemented yet
await asyncio.sleep(10) # Long operation
cleanup_called = True
task = asyncio.create_task(task_with_resource())
await asyncio.sleep(0.1)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert cleanup_called # Cleanup must happen
Step 2: Implement Minimum to Pass
import asyncio
from contextlib import asynccontextmanager
class SafeCounter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock()
async def increment(self) -> int:
async with self._lock:
self._value += 1
return self._value
async def get(self) -> int:
async with self._lock:
return self._value
@asynccontextmanager
async def managed_resource():
resource = await acquire_resource()
try:
yield resource
finally:
await release_resource(resource) # Always runs
Step 3: Refactor Following Patterns
Apply performance patterns, add timeouts, improve error handling.
Step 4: Run Full Verification
# Run async tests
pytest tests/ -v --asyncio-mode=auto
# Check for blocking calls
python -m asyncio debug
# Run with concurrency stress test
pytest tests/ -v -n auto --asyncio-mode=auto
4. Performance Patterns
Pattern 1: asyncio.gather for Concurrency
# BAD - Sequential execution
async def fetch_all_sequential(urls: list[str]) -> list[str]:
results = []
for url in urls:
result = await fetch
...
Repository Stats
Stars20
Forks2
LicenseThe Unlicense