api-development

from pluginagentmarketplace/custom-plugin-data-engineer

Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools

1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill api-development

SKILL.md

API Development

Production-grade API development with FastAPI, REST best practices, and data service patterns.

Quick Start

from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import uvicorn

app = FastAPI(title="Data API", version="1.0.0")

# Pydantic models for validation
class DataRecord(BaseModel):
    id: str = Field(..., description="Unique identifier")
    value: float = Field(..., ge=0, description="Non-negative value")
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    metadata: Optional[dict] = None

    class Config:
        json_schema_extra = {
            "example": {"id": "rec-001", "value": 42.5, "metadata": {"source": "sensor-1"}}
        }

class DataResponse(BaseModel):
    data: list[DataRecord]
    total: int
    page: int
    page_size: int

@app.get("/data", response_model=DataResponse)
async def get_data(
    page: int = Query(1, ge=1),
    page_size: int = Query(100, ge=1, le=1000),
    start_date: Optional[datetime] = None,
    end_date: Optional[datetime] = None
):
    """Retrieve paginated data records with optional date filtering."""
    # Query data with pagination
    records = query_database(page, page_size, start_date, end_date)
    total = count_records(start_date, end_date)

    return DataResponse(data=records, total=total, page=page, page_size=page_size)

@app.post("/data", status_code=201)
async def create_data(record: DataRecord):
    """Create a new data record."""
    try:
        save_to_database(record)
        return {"status": "created", "id": record.id}
    except DuplicateKeyError:
        raise HTTPException(status_code=409, detail="Record already exists")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Core Concepts

1. Dependency Injection

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session

security = HTTPBearer()

# Database session dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
    token = credentials.credentials
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        return payload
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# Rate limiting dependency
class RateLimiter:
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute

    async def __call__(self, request: Request):
        client_ip = request.client.host
        if is_rate_limited(client_ip, self.requests_per_minute):
            raise HTTPException(status_code=429, detail="Rate limit exceeded")

# Usage
@app.get("/protected")
async def protected_endpoint(
    user: dict = Depends(verify_token),
    db: Session = Depends(get_db),
    _: None = Depends(RateLimiter(100))
):
    return {"user": user["sub"]}

2. Error Handling

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import ValidationError

app = FastAPI()

class APIError(Exception):
    def __init__(self, code: str, message: str, status_code: int = 400):
        self.code = code
        self.message = message
        self.status_code = status_code

@app.exception_handler(APIError)
async def api_error_handler(request: Request, exc: APIError):
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": {"code": exc.code, "message": exc.message}}
    )

@app.exception_handler(ValidationError)
async def validation_error_handler(request: Request, exc: ValidationError):
    return JSONResponse(
        status_code=422,
        content={"error": {"code": "VALIDATION_ERROR", "details": exc.errors()}}
    )

@app.exception_handler(Exception)
async def generic_error_handler(request: Request, exc: Exception):
    logger.error(f"Unhandled error: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"error": {"code": "INTERNAL_ERROR", "message": "An unexpected error occurred"}}
    )

3. Background Tasks

from fastapi import BackgroundTasks
from celery import Celery

# Simple background tasks
@app.post("/reports")
async def generate_report(background_tasks: BackgroundTasks, report_id: str):
    background_tasks.add_task(process_report, report_id)
    return {"status": "processing", "report_id": report_id}

def process_report(report_id: str):
    # Long-running task
    time.sleep(60)
    save_report(report_id)

# Celery for distributed tasks
celery_app = Celery('tasks', broker='redis://localhost:6379')

@celery_app.task
def async_etl_job(job_id: str):
    run_etl_pipeline(job_id)

@app.

...
Read full content

Repository Stats

Stars1
Forks1
LicenseOther