testing-quality
from pluginagentmarketplace/custom-plugin-data-engineer
Data Engineer Plugin - ETL pipelines, data infrastructure, and data processing tools
1 stars1 forksUpdated Jan 5, 2026
npx skills add https://github.com/pluginagentmarketplace/custom-plugin-data-engineer --skill testing-qualitySKILL.md
Testing & Data Quality
Production testing strategies with pytest, data validation, and quality frameworks.
Quick Start
import pytest
from unittest.mock import Mock, patch
import pandas as pd
# Fixtures for test data
@pytest.fixture
def sample_dataframe():
return pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"amount": [100.0, 200.0, 300.0]
})
@pytest.fixture
def mock_database():
with patch("app.db.connection") as mock:
mock.query.return_value = [{"id": 1, "value": 100}]
yield mock
# Unit test with AAA pattern
class TestDataTransformer:
def test_calculates_total_correctly(self, sample_dataframe):
# Arrange
transformer = DataTransformer()
# Act
result = transformer.calculate_total(sample_dataframe)
# Assert
assert result == 600.0
def test_handles_empty_dataframe(self):
# Arrange
empty_df = pd.DataFrame()
transformer = DataTransformer()
# Act & Assert
with pytest.raises(ValueError, match="Empty dataframe"):
transformer.calculate_total(empty_df)
@pytest.mark.parametrize("input_val,expected", [
(100, 110),
(0, 0),
(-50, -55),
])
def test_apply_tax(self, input_val, expected):
result = apply_tax(input_val, rate=0.10)
assert result == expected
Core Concepts
1. Data Validation with Pydantic
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
from typing import Optional
class DataRecord(BaseModel):
id: str = Field(..., min_length=1)
amount: float = Field(..., ge=0)
timestamp: datetime
category: Optional[str] = None
@field_validator("id")
@classmethod
def validate_id_format(cls, v):
if not v.startswith("REC-"):
raise ValueError("ID must start with 'REC-'")
return v
@field_validator("amount")
@classmethod
def round_amount(cls, v):
return round(v, 2)
# Validation
def process_records(raw_data: list[dict]) -> list[DataRecord]:
valid_records = []
for item in raw_data:
try:
record = DataRecord(**item)
valid_records.append(record)
except ValidationError as e:
logger.warning(f"Invalid record: {e}")
return valid_records
2. Great Expectations
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
# Initialize context
context = gx.get_context()
# Create expectations
validator = context.sources.pandas_default.read_csv("data/orders.csv")
# Column expectations
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
# Value expectations
validator.expect_column_values_to_be_between("amount", min_value=0, max_value=10000)
validator.expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
# Pattern matching
validator.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
# Run validation
results = validator.validate()
if not results.success:
failed_expectations = [r for r in results.results if not r.success]
raise DataQualityError(f"Validation failed: {failed_expectations}")
3. Integration Testing
import pytest
from testcontainers.postgres import PostgresContainer
from sqlalchemy import create_engine
@pytest.fixture(scope="module")
def postgres_container():
"""Spin up real Postgres for integration tests."""
with PostgresContainer("postgres:16-alpine") as postgres:
yield postgres
@pytest.fixture
def db_engine(postgres_container):
"""Create engine with test database."""
engine = create_engine(postgres_container.get_connection_url())
# Setup schema
with engine.connect() as conn:
conn.execute(text("CREATE TABLE users (id SERIAL PRIMARY KEY, name TEXT)"))
conn.commit()
yield engine
# Cleanup
engine.dispose()
class TestDatabaseOperations:
def test_insert_and_query(self, db_engine):
# Arrange
repo = UserRepository(db_engine)
# Act
repo.insert(User(name="Test User"))
users = repo.get_all()
# Assert
assert len(users) == 1
assert users[0].name == "Test User"
def test_transaction_rollback(self, db_engine):
repo = UserRepository(db_engine)
with pytest.raises(IntegrityError):
repo.insert(User(name=None)) # Violates constraint
# Verify rollback
assert repo.count() == 0
4. Mocking External Services
from unittest.mock import Mock, patch, MagicMock
import responses
class TestAPIClient:
@responses.activate
def test_fetch_data_success(self):
# Mock HTTP response
responses.add(
responses.GET,
"https://api.example.com/da
...
Repository Stats
Stars1
Forks1
LicenseOther