api-development
from pluginagentmarketplace/custom-plugin-data-engineer
Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools
1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill api-developmentSKILL.md
API Development
Production-grade API development with FastAPI, REST best practices, and data service patterns.
Quick Start
from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from typing import Optional
from datetime import datetime
import uvicorn
app = FastAPI(title="Data API", version="1.0.0")
# Pydantic models for validation
class DataRecord(BaseModel):
id: str = Field(..., description="Unique identifier")
value: float = Field(..., ge=0, description="Non-negative value")
timestamp: datetime = Field(default_factory=datetime.utcnow)
metadata: Optional[dict] = None
class Config:
json_schema_extra = {
"example": {"id": "rec-001", "value": 42.5, "metadata": {"source": "sensor-1"}}
}
class DataResponse(BaseModel):
data: list[DataRecord]
total: int
page: int
page_size: int
@app.get("/data", response_model=DataResponse)
async def get_data(
page: int = Query(1, ge=1),
page_size: int = Query(100, ge=1, le=1000),
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
):
"""Retrieve paginated data records with optional date filtering."""
# Query data with pagination
records = query_database(page, page_size, start_date, end_date)
total = count_records(start_date, end_date)
return DataResponse(data=records, total=total, page=page, page_size=page_size)
@app.post("/data", status_code=201)
async def create_data(record: DataRecord):
"""Create a new data record."""
try:
save_to_database(record)
return {"status": "created", "id": record.id}
except DuplicateKeyError:
raise HTTPException(status_code=409, detail="Record already exists")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Core Concepts
1. Dependency Injection
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
security = HTTPBearer()
# Database session dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Authentication dependency
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
# Rate limiting dependency
class RateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.requests_per_minute = requests_per_minute
async def __call__(self, request: Request):
client_ip = request.client.host
if is_rate_limited(client_ip, self.requests_per_minute):
raise HTTPException(status_code=429, detail="Rate limit exceeded")
# Usage
@app.get("/protected")
async def protected_endpoint(
user: dict = Depends(verify_token),
db: Session = Depends(get_db),
_: None = Depends(RateLimiter(100))
):
return {"user": user["sub"]}
2. Error Handling
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import ValidationError
app = FastAPI()
class APIError(Exception):
def __init__(self, code: str, message: str, status_code: int = 400):
self.code = code
self.message = message
self.status_code = status_code
@app.exception_handler(APIError)
async def api_error_handler(request: Request, exc: APIError):
return JSONResponse(
status_code=exc.status_code,
content={"error": {"code": exc.code, "message": exc.message}}
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={"error": {"code": "VALIDATION_ERROR", "details": exc.errors()}}
)
@app.exception_handler(Exception)
async def generic_error_handler(request: Request, exc: Exception):
logger.error(f"Unhandled error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": {"code": "INTERNAL_ERROR", "message": "An unexpected error occurred"}}
)
3. Background Tasks
from fastapi import BackgroundTasks
from celery import Celery
# Simple background tasks
@app.post("/reports")
async def generate_report(background_tasks: BackgroundTasks, report_id: str):
background_tasks.add_task(process_report, report_id)
return {"status": "processing", "report_id": report_id}
def process_report(report_id: str):
# Long-running task
time.sleep(60)
save_report(report_id)
# Celery for distributed tasks
celery_app = Celery('tasks', broker='redis://localhost:6379')
@celery_app.task
def async_etl_job(job_id: str):
run_etl_pipeline(job_id)
@app.
...
Repository Stats
Stars1
Forks1
LicenseOther