testing-quality

from pluginagentmarketplace/custom-plugin-data-engineer

Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools

1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill testing-quality

SKILL.md

Testing & Data Quality

Production testing strategies with pytest, data validation, and quality frameworks.

Quick Start

import pytest
from unittest.mock import Mock, patch
import pandas as pd

# Fixtures for test data
@pytest.fixture
def sample_dataframe():
    return pd.DataFrame({
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"],
        "amount": [100.0, 200.0, 300.0]
    })

@pytest.fixture
def mock_database():
    with patch("app.db.connection") as mock:
        mock.query.return_value = [{"id": 1, "value": 100}]
        yield mock

# Unit test with AAA pattern
class TestDataTransformer:

    def test_calculates_total_correctly(self, sample_dataframe):
        # Arrange
        transformer = DataTransformer()

        # Act
        result = transformer.calculate_total(sample_dataframe)

        # Assert
        assert result == 600.0

    def test_handles_empty_dataframe(self):
        # Arrange
        empty_df = pd.DataFrame()
        transformer = DataTransformer()

        # Act & Assert
        with pytest.raises(ValueError, match="Empty dataframe"):
            transformer.calculate_total(empty_df)

    @pytest.mark.parametrize("input_val,expected", [
        (100, 110),
        (0, 0),
        (-50, -55),
    ])
    def test_apply_tax(self, input_val, expected):
        result = apply_tax(input_val, rate=0.10)
        assert result == expected

Core Concepts

1. Data Validation with Pydantic

from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Optional

class DataRecord(BaseModel):
    id: str = Field(..., min_length=1)
    amount: float = Field(..., ge=0)
    timestamp: datetime
    category: Optional[str] = None

    @field_validator("id")
    @classmethod
    def validate_id_format(cls, v):
        if not v.startswith("REC-"):
            raise ValueError("ID must start with 'REC-'")
        return v

    @field_validator("amount")
    @classmethod
    def round_amount(cls, v):
        return round(v, 2)

# Validation
def process_records(raw_data: list[dict]) -> list[DataRecord]:
    valid_records = []
    for item in raw_data:
        try:
            record = DataRecord(**item)
            valid_records.append(record)
        except ValidationError as e:
            logger.warning(f"Invalid record: {e}")
    return valid_records

2. Great Expectations

import great_expectations as gx
from great_expectations.checkpoint import Checkpoint

# Initialize context
context = gx.get_context()

# Create expectations
validator = context.sources.pandas_default.read_csv("data/orders.csv")

# Column expectations
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")

# Value expectations
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=10000)
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])

# Pattern matching
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")

# Run validation
results = validator.validate()

if not results.success:
    failed_expectations = [r for r in results.results if not r.success]
    raise DataQualityError(f"Validation failed: {failed_expectations}")

3. Integration Testing

import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine

@pytest.fixture(scope="module")
def postgres_container():
    """Spin up real Postgres for integration tests."""
    with PostgresContainer("postgres:16-alpine") as postgres:
        yield postgres

@pytest.fixture
def db_engine(postgres_container):
    """Create engine with test database."""
    engine = create_engine(postgres_container.get_connection_url())

    # Setup schema
    with engine.connect() as conn:
        conn.execute(text("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT)"))
        conn.commit()

    yield engine

    # Cleanup
    engine.dispose()

class TestDatabaseOperations:

    def test_insert_and_query(self, db_engine):
        # Arrange
        repo = UserRepository(db_engine)

        # Act
        repo.insert(User(name="Test User"))
        users = repo.get_all()

        # Assert
        assert len(users) == 1
        assert users[0].name == "Test User"

    def test_transaction_rollback(self, db_engine):
        repo = UserRepository(db_engine)

        with pytest.raises(IntegrityError):
            repo.insert(User(name=None))  # Violates constraint

        # Verify rollback
        assert repo.count() == 0

4. Mocking External Services

from unittest.mock import Mock, patch, MagicMock
import responses

class TestAPIClient:

    @responses.activate
    def test_fetch_data_success(self):
        # Mock HTTP response
        responses.add(
            responses.GET,
            "https://api.example.com/da

...
Read full content

Repository Stats

Stars1
Forks1
LicenseOther