Intermediate Advanced 120 min read

Chapter 26: MLOps and Production Systems

Experiment tracking, deployment, monitoring, and scaling.

Learning Objectives

["Track experiments", "Deploy models", "Monitor production systems"]


26.1 Introduction to MLOps Intermediate

Introduction to MLOps

MLOps (Machine Learning Operations) bridges the gap between ML development and production deployment. This section covers MLOps fundamentals, workflows, and the infrastructure needed for reliable ML systems.

MLOps Fundamentals

PYTHON
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum

def mlops_overview():
    """MLOps fundamentals"""
    print("MLOPS FUNDAMENTALS")
    print("=" * 60)

    print("""
What is MLOps?

Definition:
  A set of practices for reliable and efficient
  deployment of ML models to production

MLOps = ML + DevOps + Data Engineering

Why MLOps?

Traditional ML Challenges:
  - Models work in notebooks but fail in production
  - No reproducibility
  - Manual deployment processes
  - No monitoring or feedback loops
  - Data and model drift undetected

MLOps Benefits:
  - Reproducible experiments
  - Automated pipelines
  - Continuous training/deployment
  - Model monitoring
  - Faster iteration cycles

MLOps vs DevOps:

DevOps:
  Code → Build → Test → Deploy → Monitor

MLOps:
  Data → Preprocess → Train → Validate → Deploy → Monitor
        ↑__________________________________|
""")

mlops_overview()


class MLLifecycleStage(Enum):
    DATA_COLLECTION = "data_collection"
    DATA_PROCESSING = "data_processing"
    FEATURE_ENGINEERING = "feature_engineering"
    MODEL_TRAINING = "model_training"
    MODEL_EVALUATION = "model_evaluation"
    MODEL_DEPLOYMENT = "model_deployment"
    MONITORING = "monitoring"


def mlops_components():
    """Core MLOps components"""
    print("\nMLOPS COMPONENTS")
    print("-" * 50)

    print("""
1. Data Management:
   - Data versioning
   - Data validation
   - Feature stores

2. Experiment Tracking:
   - Track parameters
   - Log metrics
   - Compare runs

3. Model Registry:
   - Version models
   - Stage management
   - Metadata storage

4. ML Pipelines:
   - Orchestration
   - Automation
   - Reproducibility

5. Serving Infrastructure:
   - Model deployment
   - Scaling
   - A/B testing

6. Monitoring:
   - Performance tracking
   - Drift detection
   - Alerting
""")

mlops_components()


@dataclass
class MLProject:
    """ML project structure"""
    name: str
    version: str
    data_sources: List[str]
    features: List[str]
    model_type: str
    metrics: Dict[str, float]
    artifacts: Dict[str, str]

    def to_config(self) -> Dict:
        """Export project config"""
        return {
            'name': self.name,
            'version': self.version,
            'data': {
                'sources': self.data_sources,
                'features': self.features
            },
            'model': {
                'type': self.model_type,
                'metrics': self.metrics
            },
            'artifacts': self.artifacts
        }

Experiment Tracking

PYTHON
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
import hashlib

def experiment_tracking():
    """Experiment tracking concepts"""
    print("\nEXPERIMENT TRACKING")
    print("=" * 60)

    print("""
Why Track Experiments?

Problems without tracking:
  - Which hyperparameters worked best?
  - What data was used for that model?
  - Why did performance drop?
  - Can we reproduce results?

What to Track:

1. Parameters:
   - Hyperparameters
   - Model architecture
   - Data preprocessing

2. Metrics:
   - Training loss
   - Validation accuracy
   - Custom metrics

3. Artifacts:
   - Model weights
   - Plots and figures
   - Data samples

4. Metadata:
   - Git commit
   - Environment
   - Timestamps

Popular Tools:
  - MLflow
  - Weights & Biases
  - Neptune
  - Comet ML
""")

experiment_tracking()


class ExperimentTracker:
    """Simple experiment tracking system"""
    def __init__(self, project_name: str, storage_path: str = "./mlruns"):
        self.project_name = project_name
        self.storage_path = storage_path
        self.current_run = None
        self.runs = []

    def start_run(self, run_name: str = None) -> 'Run':
        """Start a new experiment run"""
        run_id = self._generate_run_id()
        self.current_run = Run(
            run_id=run_id,
            run_name=run_name or f"run_{run_id}",
            project=self.project_name,
            start_time=datetime.now()
        )
        return self.current_run

    def end_run(self):
        """End current run"""
        if self.current_run:
            self.current_run.end_time = datetime.now()
            self.runs.append(self.current_run)
            self._save_run(self.current_run)
            self.current_run = None

    def log_params(self, params: Dict[str, Any]):
        """Log parameters"""
        if self.current_run:
            self.current_run.params.update(params)

    def log_metrics(self, metrics: Dict[str, float], step: int = None):
        """Log metrics"""
        if self.current_run:
            for name, value in metrics.items():
                if name not in self.current_run.metrics:
                    self.current_run.metrics[name] = []
                self.current_run.metrics[name].append({
                    'value': value,
                    'step': step,
                    'timestamp': datetime.now().isoformat()
                })

    def log_artifact(self, name: str, path: str):
        """Log artifact path"""
        if self.current_run:
            self.current_run.artifacts[name] = path

    def _generate_run_id(self) -> str:
        timestamp = datetime.now().isoformat()
        return hashlib.md5(timestamp.encode()).hexdigest()[:8]

    def _save_run(self, run: 'Run'):
        """Save run to storage"""
        import os
        run_dir = os.path.join(self.storage_path, run.run_id)
        os.makedirs(run_dir, exist_ok=True)

        with open(os.path.join(run_dir, 'run.json'), 'w') as f:
            json.dump(run.to_dict(), f, indent=2)


@dataclass
class Run:
    """Experiment run"""
    run_id: str
    run_name: str
    project: str
    start_time: datetime
    end_time: datetime = None
    params: Dict[str, Any] = None
    metrics: Dict[str, List] = None
    artifacts: Dict[str, str] = None
    tags: Dict[str, str] = None

    def __post_init__(self):
        self.params = self.params or {}
        self.metrics = self.metrics or {}
        self.artifacts = self.artifacts or {}
        self.tags = self.tags or {}

    def to_dict(self) -> Dict:
        return {
            'run_id': self.run_id,
            'run_name': self.run_name,
            'project': self.project,
            'start_time': self.start_time.isoformat(),
            'end_time': self.end_time.isoformat() if self.end_time else None,
            'params': self.params,
            'metrics': self.metrics,
            'artifacts': self.artifacts,
            'tags': self.tags
        }


# Usage example
def training_with_tracking():
    """Example: Training with experiment tracking"""
    tracker = ExperimentTracker("sentiment-analysis")

    # Start experiment
    run = tracker.start_run("baseline_bert")

    # Log hyperparameters
    tracker.log_params({
        'model': 'bert-base-uncased',
        'learning_rate': 2e-5,
        'batch_size': 32,
        'epochs': 3,
        'max_length': 128
    })

    # Simulate training loop
    for epoch in range(3):
        # Training...
        train_loss = 0.5 - (epoch * 0.1)
        val_accuracy = 0.8 + (epoch * 0.05)

        tracker.log_metrics({
            'train_loss': train_loss,
            'val_accuracy': val_accuracy
        }, step=epoch)

    # Log model artifact
    tracker.log_artifact('model', './models/bert_sentiment.pt')

    tracker.end_run()

training_with_tracking()

Model Registry

PYTHON
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

def model_registry_overview():
    """Model registry concepts"""
    print("\nMODEL REGISTRY")
    print("=" * 60)

    print("""
What is a Model Registry?

Central repository for:
  - Storing trained models
  - Versioning models
  - Managing model lifecycle
  - Tracking model lineage

Model Lifecycle Stages:
  1. Development: Experimental models
  2. Staging: Candidates for production
  3. Production: Live serving models
  4. Archived: Deprecated models

Key Features:

1. Version Control:
   - Semantic versioning
   - Immutable versions
   - Rollback capability

2. Metadata:
   - Training metrics
   - Data lineage
   - Environment specs

3. Governance:
   - Approval workflows
   - Access control
   - Audit trails

4. Integration:
   - CI/CD pipelines
   - Serving systems
   - Monitoring
""")

model_registry_overview()


class ModelStage(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"
    ARCHIVED = "archived"


@dataclass
class ModelVersion:
    """Model version in registry"""
    name: str
    version: str
    stage: ModelStage
    source: str  # Path or URI
    run_id: str  # Link to experiment
    metrics: Dict[str, float]
    signature: Dict  # Input/output schema
    created_at: datetime
    description: str = ""
    tags: Dict[str, str] = None

    def __post_init__(self):
        self.tags = self.tags or {}


class ModelRegistry:
    """Simple model registry"""
    def __init__(self):
        self.models: Dict[str, List[ModelVersion]] = {}

    def register_model(self, model_version: ModelVersion) -> str:
        """Register a new model version"""
        name = model_version.name

        if name not in self.models:
            self.models[name] = []

        # Check version doesn't exist
        existing = [m for m in self.models[name]
                   if m.version == model_version.version]
        if existing:
            raise ValueError(f"Version {model_version.version} exists")

        self.models[name].append(model_version)
        return f"{name}/{model_version.version}"

    def get_model(self, name: str,
                  version: str = None,
                  stage: ModelStage = None) -> Optional[ModelVersion]:
        """Get model by name and version/stage"""
        if name not in self.models:
            return None

        versions = self.models[name]

        if version:
            matches = [m for m in versions if m.version == version]
            return matches[0] if matches else None

        if stage:
            matches = [m for m in versions if m.stage == stage]
            return matches[-1] if matches else None

        # Return latest
        return versions[-1] if versions else None

    def transition_stage(self, name: str, version: str,
                         new_stage: ModelStage) -> bool:
        """Transition model to new stage"""
        model = self.get_model(name, version)
        if not model:
            return False

        # If promoting to production, demote current production
        if new_stage == ModelStage.PRODUCTION:
            current_prod = self.get_model(name, stage=ModelStage.PRODUCTION)
            if current_prod:
                current_prod.stage = ModelStage.ARCHIVED

        model.stage = new_stage
        return True

    def list_models(self, name: str = None) -> List[Dict]:
        """List all models or versions of a model"""
        if name:
            return [m.__dict__ for m in self.models.get(name, [])]

        return [
            {'name': n, 'versions': len(v)}
            for n, v in self.models.items()
        ]

    def compare_versions(self, name: str,
                        v1: str, v2: str) -> Dict:
        """Compare two model versions"""
        m1 = self.get_model(name, v1)
        m2 = self.get_model(name, v2)

        if not m1 or not m2:
            return {}

        # Compare metrics
        comparison = {}
        all_metrics = set(m1.metrics.keys()) | set(m2.metrics.keys())

        for metric in all_metrics:
            val1 = m1.metrics.get(metric)
            val2 = m2.metrics.get(metric)
            if val1 and val2:
                comparison[metric] = {
                    'v1': val1,
                    'v2': val2,
                    'diff': val2 - val1,
                    'improved': val2 > val1
                }

        return comparison


# Example usage
def registry_example():
    """Example model registry usage"""
    registry = ModelRegistry()

    # Register initial model
    v1 = ModelVersion(
        name="fraud-detector",
        version="1.0.0",
        stage=ModelStage.DEVELOPMENT,
        source="s3://models/fraud-detector/v1",
        run_id="abc123",
        metrics={'accuracy': 0.92, 'f1': 0.88, 'auc': 0.95},
        signature={
            'inputs': {'features': 'float[batch, 50]'},
            'outputs': {'probability': 'float[batch, 1]'}
        },
        created_at=datetime.now()
    )
    registry.register_model(v1)

    # Promote to production
    registry.transition_stage("fraud-detector", "1.0.0",
                             ModelStage.PRODUCTION)

    # Register improved version
    v2 = ModelVersion(
        name="fraud-detector",
        version="1.1.0",
        stage=ModelStage.STAGING,
        source="s3://models/fraud-detector/v2",
        run_id="def456",
        metrics={'accuracy': 0.94, 'f1': 0.91, 'auc': 0.97},
        signature={
            'inputs': {'features': 'float[batch, 50]'},
            'outputs': {'probability': 'float[batch, 1]'}
        },
        created_at=datetime.now()
    )
    registry.register_model(v2)

    # Compare versions
    comparison = registry.compare_versions("fraud-detector", "1.0.0", "1.1.0")
    print("Version comparison:", comparison)

registry_example()

ML Pipeline Basics

PYTHON
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import time

def ml_pipelines():
    """ML pipeline fundamentals"""
    print("\nML PIPELINES")
    print("=" * 60)

    print("""
What is an ML Pipeline?

Automated workflow that:
  - Orchestrates ML stages
  - Ensures reproducibility
  - Enables CI/CD for ML

Pipeline Components:

1. Data Ingestion:
   - Fetch data sources
   - Validate data quality

2. Preprocessing:
   - Clean and transform
   - Feature engineering

3. Training:
   - Model training
   - Hyperparameter tuning

4. Evaluation:
   - Compute metrics
   - Compare to baseline

5. Deployment:
   - Package model
   - Deploy to serving

Pipeline Tools:
  - Kubeflow Pipelines
  - Apache Airflow
  - Prefect
  - MLflow Pipelines
  - ZenML
""")

ml_pipelines()


class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"


@dataclass
class PipelineStep:
    """Pipeline step definition"""
    name: str
    function: Callable
    inputs: List[str] = None
    outputs: List[str] = None
    dependencies: List[str] = None
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str = None

    def __post_init__(self):
        self.inputs = self.inputs or []
        self.outputs = self.outputs or []
        self.dependencies = self.dependencies or []


class SimplePipeline:
    """Simple ML pipeline orchestrator"""
    def __init__(self, name: str):
        self.name = name
        self.steps: Dict[str, PipelineStep] = {}
        self.context: Dict[str, Any] = {}

    def add_step(self, step: PipelineStep):
        """Add step to pipeline"""
        self.steps[step.name] = step

    def run(self, initial_context: Dict = None) -> Dict:
        """Execute pipeline"""
        self.context = initial_context or {}
        execution_order = self._get_execution_order()

        print(f"Running pipeline: {self.name}")
        print(f"Steps: {[s for s in execution_order]}")

        for step_name in execution_order:
            step = self.steps[step_name]

            # Check dependencies
            if not self._deps_satisfied(step):
                step.status = StepStatus.SKIPPED
                continue

            # Execute step
            step.status = StepStatus.RUNNING
            try:
                # Get inputs from context
                inputs = {k: self.context[k] for k in step.inputs
                         if k in self.context}

                # Run step
                start = time.time()
                result = step.function(**inputs)
                duration = time.time() - start

                # Store outputs
                if isinstance(result, dict):
                    for output in step.outputs:
                        if output in result:
                            self.context[output] = result[output]
                else:
                    if step.outputs:
                        self.context[step.outputs[0]] = result

                step.result = result
                step.status = StepStatus.COMPLETED
                print(f"  ✓ {step_name} ({duration:.2f}s)")

            except Exception as e:
                step.error = str(e)
                step.status = StepStatus.FAILED
                print(f"  ✗ {step_name}: {e}")

        return self._get_results()

    def _get_execution_order(self) -> List[str]:
        """Topological sort of steps"""
        order = []
        visited = set()

        def visit(name):
            if name in visited:
                return
            visited.add(name)
            step = self.steps[name]
            for dep in step.dependencies:
                visit(dep)
            order.append(name)

        for name in self.steps:
            visit(name)

        return order

    def _deps_satisfied(self, step: PipelineStep) -> bool:
        """Check if dependencies completed"""
        for dep in step.dependencies:
            if dep in self.steps:
                if self.steps[dep].status != StepStatus.COMPLETED:
                    return False
        return True

    def _get_results(self) -> Dict:
        """Get pipeline results"""
        return {
            'pipeline': self.name,
            'steps': {
                name: {
                    'status': step.status.value,
                    'error': step.error
                }
                for name, step in self.steps.items()
            },
            'outputs': self.context
        }


# Example pipeline
def example_pipeline():
    """Example ML pipeline"""
    # Define step functions
    def load_data():
        print("    Loading data...")
        return {'data': [[1, 2], [3, 4], [5, 6]]}

    def preprocess(data):
        print("    Preprocessing...")
        return {'processed_data': [x for x in data]}

    def train_model(processed_data):
        print("    Training model...")
        return {'model': 'trained_model', 'accuracy': 0.95}

    def evaluate(model, processed_data):
        print("    Evaluating...")
        return {'metrics': {'accuracy': 0.93, 'f1': 0.91}}

    # Build pipeline
    pipeline = SimplePipeline("training_pipeline")

    pipeline.add_step(PipelineStep(
        name="load_data",
        function=load_data,
        outputs=["data"]
    ))

    pipeline.add_step(PipelineStep(
        name="preprocess",
        function=preprocess,
        inputs=["data"],
        outputs=["processed_data"],
        dependencies=["load_data"]
    ))

    pipeline.add_step(PipelineStep(
        name="train",
        function=train_model,
        inputs=["processed_data"],
        outputs=["model", "accuracy"],
        dependencies=["preprocess"]
    ))

    pipeline.add_step(PipelineStep(
        name="evaluate",
        function=evaluate,
        inputs=["model", "processed_data"],
        outputs=["metrics"],
        dependencies=["train"]
    ))

    # Run pipeline
    results = pipeline.run()
    print("\nPipeline results:", results)

example_pipeline()

Summary

| Component | Purpose | Tools | |-----------|---------|-------| | Experiment Tracking | Reproducibility | MLflow, W&B | | Model Registry | Model management | MLflow, Vertex AI | | Pipelines | Automation | Kubeflow, Airflow | | Feature Store | Feature management | Feast, Tecton |

Key takeaways:

  • MLOps bridges ML development and production
  • Experiment tracking ensures reproducibility
  • Model registry manages model lifecycle
  • Pipelines automate ML workflows
  • Version everything: data, code, models
  • Start simple, add complexity as needed

26.2 Data and Feature Management Intermediate

Data and Feature Management

Data is the foundation of ML systems. This section covers data versioning, validation, and feature stores for managing ML data at scale.

Data Versioning

PYTHON
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import hashlib
import json

def data_versioning():
    """Data versioning concepts"""
    print("DATA VERSIONING")
    print("=" * 60)

    print("""
Why Version Data?

Problems without versioning:
  - "Which dataset trained this model?"
  - "What changed between versions?"
  - "Can we reproduce last month's results?"

What to Version:

1. Raw Data:
   - Original source data
   - Snapshots at collection time

2. Processed Data:
   - Cleaned datasets
   - Feature-engineered data

3. Splits:
   - Train/val/test splits
   - Cross-validation folds

4. Metadata:
   - Schema definitions
   - Statistics
   - Data quality reports

Data Versioning Tools:
  - DVC (Data Version Control)
  - LakeFS
  - Delta Lake
  - Pachyderm
""")

data_versioning()


@dataclass
class DataVersion:
    """Data version metadata"""
    version_id: str
    name: str
    path: str
    checksum: str
    size_bytes: int
    num_records: int
    schema: Dict[str, str]
    created_at: datetime
    parent_version: str = None
    description: str = ""
    tags: Dict[str, str] = None

    def __post_init__(self):
        self.tags = self.tags or {}


class DataVersionControl:
    """Simple data version control"""
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
        self.versions: Dict[str, List[DataVersion]] = {}

    def commit(self, name: str, data_path: str,
               schema: Dict, description: str = "") -> DataVersion:
        """Commit a new data version"""
        import os

        # Compute checksum
        checksum = self._compute_checksum(data_path)

        # Get file stats
        stats = os.stat(data_path)

        # Generate version ID
        version_id = self._generate_version_id(name, checksum)

        # Get parent version
        parent = None
        if name in self.versions and self.versions[name]:
            parent = self.versions[name][-1].version_id

        version = DataVersion(
            version_id=version_id,
            name=name,
            path=data_path,
            checksum=checksum,
            size_bytes=stats.st_size,
            num_records=self._count_records(data_path),
            schema=schema,
            created_at=datetime.now(),
            parent_version=parent,
            description=description
        )

        if name not in self.versions:
            self.versions[name] = []
        self.versions[name].append(version)

        return version

    def get_version(self, name: str,
                    version_id: str = None) -> Optional[DataVersion]:
        """Get data version"""
        if name not in self.versions:
            return None

        if version_id:
            matches = [v for v in self.versions[name]
                      if v.version_id == version_id]
            return matches[0] if matches else None

        return self.versions[name][-1]  # Latest

    def diff(self, name: str, v1: str, v2: str) -> Dict:
        """Compare two data versions"""
        ver1 = self.get_version(name, v1)
        ver2 = self.get_version(name, v2)

        if not ver1 or not ver2:
            return {}

        return {
            'size_change': ver2.size_bytes - ver1.size_bytes,
            'records_change': ver2.num_records - ver1.num_records,
            'schema_changes': self._compare_schemas(
                ver1.schema, ver2.schema
            )
        }

    def _compute_checksum(self, path: str) -> str:
        """Compute file checksum"""
        hasher = hashlib.sha256()
        with open(path, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                hasher.update(chunk)
        return hasher.hexdigest()[:16]

    def _generate_version_id(self, name: str, checksum: str) -> str:
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        return f"{name}-{timestamp}-{checksum[:8]}"

    def _count_records(self, path: str) -> int:
        """Count records in file"""
        with open(path, 'r') as f:
            return sum(1 for _ in f) - 1  # Minus header

    def _compare_schemas(self, s1: Dict, s2: Dict) -> Dict:
        """Compare two schemas"""
        added = set(s2.keys()) - set(s1.keys())
        removed = set(s1.keys()) - set(s2.keys())
        changed = {k for k in s1 if k in s2 and s1[k] != s2[k]}

        return {
            'added': list(added),
            'removed': list(removed),
            'changed': list(changed)
        }

Data Validation

PYTHON
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import numpy as np

def data_validation():
    """Data validation concepts"""
    print("\nDATA VALIDATION")
    print("=" * 60)

    print("""
Why Validate Data?

Data Quality Issues:
  - Missing values
  - Out-of-range values
  - Type mismatches
  - Distribution shifts
  - Schema changes

When to Validate:

1. At Ingestion:
   - Validate incoming data
   - Reject invalid batches

2. Before Training:
   - Verify training data quality
   - Check for data leakage

3. At Serving:
   - Validate prediction inputs
   - Detect anomalies

Validation Types:
  - Schema validation
  - Statistical validation
  - Business rule validation
  - Cross-feature validation

Tools:
  - Great Expectations
  - Pandera
  - TensorFlow Data Validation
  - Deequ
""")

data_validation()


@dataclass
class ValidationResult:
    """Data validation result"""
    passed: bool
    checks_run: int
    checks_passed: int
    failures: List[Dict]
    warnings: List[Dict]
    statistics: Dict[str, Any]


class DataValidator:
    """Data validation framework"""
    def __init__(self):
        self.expectations = []

    def expect_column_to_exist(self, column: str):
        """Expect column exists"""
        self.expectations.append({
            'type': 'column_exists',
            'column': column
        })
        return self

    def expect_column_values_in_range(self, column: str,
                                       min_val: float = None,
                                       max_val: float = None):
        """Expect values in range"""
        self.expectations.append({
            'type': 'values_in_range',
            'column': column,
            'min': min_val,
            'max': max_val
        })
        return self

    def expect_column_values_not_null(self, column: str,
                                       max_null_ratio: float = 0.0):
        """Expect no null values"""
        self.expectations.append({
            'type': 'not_null',
            'column': column,
            'max_null_ratio': max_null_ratio
        })
        return self

    def expect_column_values_unique(self, column: str):
        """Expect unique values"""
        self.expectations.append({
            'type': 'unique',
            'column': column
        })
        return self

    def expect_column_mean_in_range(self, column: str,
                                     min_mean: float, max_mean: float):
        """Expect column mean in range"""
        self.expectations.append({
            'type': 'mean_in_range',
            'column': column,
            'min': min_mean,
            'max': max_mean
        })
        return self

    def validate(self, data) -> ValidationResult:
        """Run all validations"""
        failures = []
        warnings = []
        statistics = {}

        for exp in self.expectations:
            result = self._check_expectation(data, exp)
            if not result['passed']:
                if result.get('critical', True):
                    failures.append(result)
                else:
                    warnings.append(result)

            if 'stats' in result:
                statistics.update(result['stats'])

        return ValidationResult(
            passed=len(failures) == 0,
            checks_run=len(self.expectations),
            checks_passed=len(self.expectations) - len(failures),
            failures=failures,
            warnings=warnings,
            statistics=statistics
        )

    def _check_expectation(self, data, exp: Dict) -> Dict:
        """Check single expectation"""
        exp_type = exp['type']
        column = exp.get('column')

        if exp_type == 'column_exists':
            passed = column in data.columns
            return {'type': exp_type, 'column': column, 'passed': passed}

        elif exp_type == 'values_in_range':
            if column not in data.columns:
                return {'type': exp_type, 'passed': False,
                        'error': f'Column {column} not found'}

            values = data[column].dropna()
            min_val, max_val = exp.get('min'), exp.get('max')

            passed = True
            if min_val is not None:
                passed = passed and (values >= min_val).all()
            if max_val is not None:
                passed = passed and (values <= max_val).all()

            return {
                'type': exp_type,
                'column': column,
                'passed': passed,
                'stats': {f'{column}_min': values.min(),
                         f'{column}_max': values.max()}
            }

        elif exp_type == 'not_null':
            null_ratio = data[column].isnull().mean()
            passed = null_ratio <= exp['max_null_ratio']
            return {
                'type': exp_type,
                'column': column,
                'passed': passed,
                'stats': {f'{column}_null_ratio': null_ratio}
            }

        elif exp_type == 'unique':
            is_unique = data[column].is_unique
            return {'type': exp_type, 'column': column, 'passed': is_unique}

        elif exp_type == 'mean_in_range':
            mean = data[column].mean()
            passed = exp['min'] <= mean <= exp['max']
            return {
                'type': exp_type,
                'column': column,
                'passed': passed,
                'stats': {f'{column}_mean': mean}
            }

        return {'type': exp_type, 'passed': True}


# Example usage
def validation_example():
    """Example data validation"""
    import pandas as pd

    # Sample data
    data = pd.DataFrame({
        'user_id': [1, 2, 3, 4, 5],
        'age': [25, 30, None, 45, 200],  # Has null and outlier
        'score': [0.8, 0.9, 0.7, 0.85, 0.95]
    })

    # Define expectations
    validator = DataValidator()
    validator.expect_column_to_exist('user_id')
    validator.expect_column_to_exist('age')
    validator.expect_column_values_in_range('age', min_val=0, max_val=120)
    validator.expect_column_values_not_null('age', max_null_ratio=0.1)
    validator.expect_column_mean_in_range('score', 0.5, 1.0)

    # Validate
    result = validator.validate(data)
    print(f"Validation passed: {result.passed}")
    print(f"Checks: {result.checks_passed}/{result.checks_run}")
    if result.failures:
        print("Failures:", result.failures)

validation_example()

Feature Stores

PYTHON
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
import numpy as np

def feature_store_overview():
    """Feature store concepts"""
    print("\nFEATURE STORES")
    print("=" * 60)

    print("""
What is a Feature Store?

Centralized repository for:
  - Storing features
  - Serving features
  - Sharing features across teams

Why Feature Stores?

Problems without feature stores:
  - Duplicate feature engineering
  - Training-serving skew
  - Feature inconsistency
  - No feature reuse

Feature Store Components:

1. Feature Registry:
   - Feature definitions
   - Metadata
   - Lineage

2. Offline Store:
   - Historical features
   - Batch processing
   - Training data

3. Online Store:
   - Low-latency serving
   - Real-time features
   - Caching

4. Feature Serving:
   - Training data generation
   - Online inference

Tools:
  - Feast
  - Tecton
  - Vertex AI Feature Store
  - Amazon SageMaker Feature Store
""")

feature_store_overview()


@dataclass
class Feature:
    """Feature definition"""
    name: str
    dtype: str
    description: str
    entity: str  # Primary key entity
    tags: Dict[str, str] = None
    created_at: datetime = None

    def __post_init__(self):
        self.tags = self.tags or {}
        self.created_at = self.created_at or datetime.now()


@dataclass
class FeatureView:
    """Collection of features for an entity"""
    name: str
    entity: str
    features: List[Feature]
    source: str  # Data source
    ttl: int = 3600  # Time-to-live in seconds


class SimpleFeatureStore:
    """Simple feature store implementation"""
    def __init__(self):
        self.feature_views: Dict[str, FeatureView] = {}
        self.offline_store: Dict[str, Dict] = {}  # entity_id -> features
        self.online_store: Dict[str, Dict] = {}

    def register_feature_view(self, view: FeatureView):
        """Register feature view"""
        self.feature_views[view.name] = view

    def materialize(self, view_name: str, data: List[Dict]):
        """Materialize features to stores"""
        view = self.feature_views.get(view_name)
        if not view:
            raise ValueError(f"Unknown view: {view_name}")

        entity = view.entity

        for record in data:
            entity_id = record[entity]

            # Extract features
            features = {
                f.name: record.get(f.name)
                for f in view.features
            }

            # Store in offline store
            if entity_id not in self.offline_store:
                self.offline_store[entity_id] = {}
            self.offline_store[entity_id][view_name] = {
                **features,
                '_timestamp': datetime.now().isoformat()
            }

            # Store in online store
            if entity_id not in self.online_store:
                self.online_store[entity_id] = {}
            self.online_store[entity_id][view_name] = features

    def get_online_features(self, view_name: str,
                            entity_ids: List[str]) -> List[Dict]:
        """Get features for online inference"""
        features = []
        for eid in entity_ids:
            if eid in self.online_store:
                view_features = self.online_store[eid].get(view_name, {})
                features.append({
                    'entity_id': eid,
                    **view_features
                })
            else:
                features.append({'entity_id': eid})
        return features

    def get_historical_features(self, view_name: str,
                                entity_df) -> 'pd.DataFrame':
        """Get historical features for training"""
        import pandas as pd

        results = []
        entity_col = self.feature_views[view_name].entity

        for _, row in entity_df.iterrows():
            entity_id = row[entity_col]
            if entity_id in self.offline_store:
                features = self.offline_store[entity_id].get(view_name, {})
                results.append({entity_col: entity_id, **features})

        return pd.DataFrame(results)


# Example usage
def feature_store_example():
    """Example feature store usage"""
    # Initialize store
    store = SimpleFeatureStore()

    # Define features
    user_features = FeatureView(
        name="user_features",
        entity="user_id",
        features=[
            Feature("age", "int", "User age", "user_id"),
            Feature("total_purchases", "float", "Total purchase amount", "user_id"),
            Feature("days_since_signup", "int", "Days since signup", "user_id"),
        ],
        source="users_table"
    )
    store.register_feature_view(user_features)

    # Materialize features
    user_data = [
        {'user_id': 'u1', 'age': 25, 'total_purchases': 500.0, 'days_since_signup': 100},
        {'user_id': 'u2', 'age': 32, 'total_purchases': 1500.0, 'days_since_signup': 365},
        {'user_id': 'u3', 'age': 28, 'total_purchases': 200.0, 'days_since_signup': 30},
    ]
    store.materialize("user_features", user_data)

    # Get online features for inference
    online = store.get_online_features("user_features", ['u1', 'u2'])
    print("Online features:", online)

feature_store_example()

Data Quality Monitoring

PYTHON
from typing import Dict, List, Optional
from dataclasses import dataclass
import numpy as np

def data_quality_monitoring():
    """Data quality monitoring"""
    print("\nDATA QUALITY MONITORING")
    print("=" * 60)

    print("""
What to Monitor:

1. Completeness:
   - Missing value rates
   - Required field presence

2. Freshness:
   - Data age
   - Update frequency

3. Accuracy:
   - Range violations
   - Format issues

4. Consistency:
   - Cross-field validation
   - Referential integrity

5. Distribution:
   - Statistical drift
   - Anomaly detection

Monitoring Strategies:

1. Schema Monitoring:
   - Detect schema changes
   - Type changes

2. Statistical Monitoring:
   - Mean/std drift
   - Distribution shift

3. Volume Monitoring:
   - Record counts
   - Size changes

4. Anomaly Detection:
   - Outlier detection
   - Pattern breaks
""")

data_quality_monitoring()


@dataclass
class DataProfile:
    """Statistical profile of data"""
    column: str
    dtype: str
    count: int
    null_count: int
    unique_count: int
    mean: float = None
    std: float = None
    min_val: float = None
    max_val: float = None
    percentiles: Dict[str, float] = None


class DataProfiler:
    """Generate data profiles"""
    def profile(self, data, columns: List[str] = None) -> List[DataProfile]:
        """Profile dataframe columns"""
        if columns is None:
            columns = data.columns.tolist()

        profiles = []
        for col in columns:
            profile = self._profile_column(data, col)
            profiles.append(profile)

        return profiles

    def _profile_column(self, data, column: str) -> DataProfile:
        """Profile single column"""
        col_data = data[column]
        dtype = str(col_data.dtype)

        profile = DataProfile(
            column=column,
            dtype=dtype,
            count=len(col_data),
            null_count=col_data.isnull().sum(),
            unique_count=col_data.nunique()
        )

        if np.issubdtype(col_data.dtype, np.number):
            numeric = col_data.dropna()
            profile.mean = numeric.mean()
            profile.std = numeric.std()
            profile.min_val = numeric.min()
            profile.max_val = numeric.max()
            profile.percentiles = {
                'p25': numeric.quantile(0.25),
                'p50': numeric.quantile(0.50),
                'p75': numeric.quantile(0.75)
            }

        return profile


class DriftDetector:
    """Detect data drift"""
    def __init__(self, reference_profile: List[DataProfile]):
        self.reference = {p.column: p for p in reference_profile}
        self.thresholds = {
            'mean_drift': 0.2,  # 20% change
            'std_drift': 0.3,
            'null_rate_drift': 0.1
        }

    def detect_drift(self, current_profile: List[DataProfile]) -> Dict:
        """Detect drift from reference"""
        drift_results = {}

        for profile in current_profile:
            col = profile.column
            if col not in self.reference:
                drift_results[col] = {'status': 'new_column'}
                continue

            ref = self.reference[col]
            drifts = []

            # Check mean drift
            if ref.mean and profile.mean:
                mean_change = abs(profile.mean - ref.mean) / (abs(ref.mean) + 1e-10)
                if mean_change > self.thresholds['mean_drift']:
                    drifts.append({
                        'type': 'mean_drift',
                        'reference': ref.mean,
                        'current': profile.mean,
                        'change': mean_change
                    })

            # Check null rate drift
            ref_null_rate = ref.null_count / ref.count
            curr_null_rate = profile.null_count / profile.count
            null_change = abs(curr_null_rate - ref_null_rate)
            if null_change > self.thresholds['null_rate_drift']:
                drifts.append({
                    'type': 'null_rate_drift',
                    'reference': ref_null_rate,
                    'current': curr_null_rate
                })

            drift_results[col] = {
                'status': 'drift_detected' if drifts else 'ok',
                'drifts': drifts
            }

        return drift_results


# Example
def drift_detection_example():
    """Example drift detection"""
    import pandas as pd

    # Reference data
    ref_data = pd.DataFrame({
        'age': np.random.normal(30, 5, 1000),
        'income': np.random.normal(50000, 10000, 1000)
    })

    # Current data with drift
    curr_data = pd.DataFrame({
        'age': np.random.normal(35, 5, 1000),  # Mean shifted
        'income': np.random.normal(50000, 10000, 1000)
    })

    # Profile and detect
    profiler = DataProfiler()
    ref_profile = profiler.profile(ref_data)
    curr_profile = profiler.profile(curr_data)

    detector = DriftDetector(ref_profile)
    drift = detector.detect_drift(curr_profile)
    print("Drift detection:", drift)

drift_detection_example()

Summary

| Component | Purpose | Key Tool | |-----------|---------|----------| | Data Versioning | Track data changes | DVC | | Data Validation | Ensure quality | Great Expectations | | Feature Store | Manage features | Feast | | Drift Detection | Monitor changes | Custom/Evidently |

Key takeaways:

  • Version data like code
  • Validate data at every stage
  • Feature stores prevent training-serving skew
  • Monitor data quality continuously
  • Detect drift before it impacts models
  • Automate data quality checks in pipelines

26.3 Model Deployment and Serving Advanced

Model Deployment and Serving

Deploying ML models to production requires careful consideration of serving patterns, infrastructure, and performance. This section covers model serving architectures and deployment strategies.

Model Serving Patterns

PYTHON
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time

def serving_patterns():
    """Model serving patterns"""
    print("MODEL SERVING PATTERNS")
    print("=" * 60)

    print("""
Serving Patterns:

1. Batch Inference:
   - Process data in batches
   - Scheduled runs (hourly, daily)
   - High throughput, high latency

2. Online Inference:
   - Real-time predictions
   - Low latency required
   - Single or small batch requests

3. Streaming Inference:
   - Process data streams
   - Near real-time
   - Event-driven

Deployment Options:

1. REST API:
   - HTTP endpoints
   - Widely supported
   - Easy to integrate

2. gRPC:
   - Binary protocol
   - Lower latency
   - Streaming support

3. Embedded:
   - Model in application
   - No network overhead
   - Edge deployment

4. Serverless:
   - Pay per request
   - Auto-scaling
   - Cold start latency
""")

serving_patterns()


class ModelServer(ABC):
    """Abstract model server"""
    @abstractmethod
    def load_model(self, model_path: str):
        pass

    @abstractmethod
    def predict(self, inputs: Dict) -> Dict:
        pass

    @abstractmethod
    def health_check(self) -> bool:
        pass


class SimpleModelServer(ModelServer):
    """Simple REST model server"""
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.model = None
        self.loaded = False
        self.request_count = 0
        self.total_latency = 0

    def load_model(self, model_path: str):
        """Load model from path"""
        # Simulated model loading
        print(f"Loading model from {model_path}")
        self.model = {'path': model_path, 'type': 'sklearn'}
        self.loaded = True

    def predict(self, inputs: Dict) -> Dict:
        """Make prediction"""
        if not self.loaded:
            raise RuntimeError("Model not loaded")

        start = time.time()

        # Simulated prediction
        result = {
            'predictions': [0.85],
            'model': self.model_name
        }

        latency = time.time() - start
        self.request_count += 1
        self.total_latency += latency

        return result

    def health_check(self) -> bool:
        return self.loaded

    def get_metrics(self) -> Dict:
        return {
            'request_count': self.request_count,
            'avg_latency_ms': (self.total_latency / max(1, self.request_count)) * 1000
        }


@dataclass
class ServingConfig:
    """Model serving configuration"""
    model_name: str
    model_path: str
    batch_size: int = 1
    max_batch_wait_ms: int = 10
    num_workers: int = 1
    timeout_ms: int = 5000
    max_concurrent: int = 100

Containerization with Docker

PYTHON
def containerization():
    """Model containerization"""
    print("\nCONTAINERIZATION")
    print("=" * 60)

    print("""
Why Containerize Models?

Benefits:
  - Reproducible environments
  - Consistent deployment
  - Easy scaling
  - Isolation

Dockerfile for ML Model:
dockerfile FROM python:3.9-slim

Install dependencies

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

Copy model and code

COPY model/ /app/model/ COPY src/ /app/src/

WORKDIR /app

Expose port

EXPOSE 8080

Run server

CMD ["python", "src/server.py"]


Best Practices:
  - Use slim base images
  - Multi-stage builds
  - Cache dependencies
  - Minimize image size
  - Health checks
""")

containerization()


# Example Dockerfile generator
class DockerfileGenerator:
    """Generate Dockerfile for ML models"""
    def __init__(self, base_image: str = "python:3.9-slim"):
        self.base_image = base_image
        self.dependencies = []
        self.copy_commands = []
        self.env_vars = {}
        self.port = 8080
        self.cmd = None

    def add_pip_dependencies(self, packages: List[str]):
        self.dependencies.extend(packages)
        return self

    def add_copy(self, src: str, dst: str):
        self.copy_commands.append((src, dst))
        return self

    def set_env(self, key: str, value: str):
        self.env_vars[key] = value
        return self

    def set_port(self, port: int):
        self.port = port
        return self

    def set_cmd(self, cmd: List[str]):
        self.cmd = cmd
        return self

    def generate(self) -> str:
        """Generate Dockerfile content"""
        lines = [f"FROM {self.base_image}", ""]

        # Environment variables
        for key, value in self.env_vars.items():
            lines.append(f"ENV {key}={value}")
        if self.env_vars:
            lines.append("")

        # Install dependencies
        if self.dependencies:
            deps = " ".join(self.dependencies)
            lines.append(f"RUN pip install --no-cache-dir {deps}")
            lines.append("")

        # Copy files
        for src, dst in self.copy_commands:
            lines.append(f"COPY {src} {dst}")
        if self.copy_commands:
            lines.append("")

        # Working directory
        lines.append("WORKDIR /app")
        lines.append("")

        # Port
        lines.append(f"EXPOSE {self.port}")
        lines.append("")

        # Health check
        lines.append(f'HEALTHCHECK CMD curl -f http://localhost:{self.port}/health || exit 1')
        lines.append("")

        # Command
        if self.cmd:
            cmd_str = ", ".join([f'"{c}"' for c in self.cmd])
            lines.append(f"CMD [{cmd_str}]")

        return "\n".join(lines)


# Example usage
def dockerfile_example():
    generator = DockerfileGenerator()
    dockerfile = (generator
        .add_pip_dependencies(['fastapi', 'uvicorn', 'scikit-learn'])
        .add_copy('model/', '/app/model/')
        .add_copy('src/', '/app/src/')
        .set_env('MODEL_PATH', '/app/model/model.pkl')
        .set_port(8080)
        .set_cmd(['uvicorn', 'src.server:app', '--host', '0.0.0.0', '--port', '8080'])
        .generate())
    print(dockerfile)

dockerfile_example()

Model Serving Frameworks

PYTHON
def serving_frameworks():
    """ML serving frameworks"""
    print("\nSERVING FRAMEWORKS")
    print("=" * 60)

    print("""
Popular Serving Frameworks:

1. TensorFlow Serving:
   - Production-grade
   - Batch support
   - Model versioning

2. TorchServe:
   - PyTorch models
   - Multi-model serving
   - Metrics built-in

3. Triton Inference Server:
   - Multi-framework
   - GPU optimized
   - Dynamic batching

4. BentoML:
   - Easy packaging
   - Multiple frameworks
   - Adaptive batching

5. FastAPI + Custom:
   - Flexible
   - Python native
   - Easy to customize

6. Seldon Core:
   - Kubernetes native
   - A/B testing
   - Explainability

7. KServe:
   - Serverless
   - Multi-framework
   - Autoscaling
""")

serving_frameworks()


# FastAPI model server example
class FastAPIServer:
    """FastAPI-based model server"""
    def __init__(self):
        self.app = None
        self.model = None

    def create_app(self, model_path: str):
        """Create FastAPI application"""
        # This would normally use fastapi
        app_code = '''
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import pickle
import numpy as np

app = FastAPI(title="ML Model Server")

# Load model
with open("{model_path}", "rb") as f:
    model = pickle.load(f)

class PredictRequest(BaseModel):
    features: list

class PredictResponse(BaseModel):
    predictions: list
    model_version: str = "1.0"

@app.get("/health")
def health():
    return {{"status": "healthy"}}

@app.post("/predict", response_model=PredictResponse)
def predict(request: PredictRequest):
    try:
        features = np.array(request.features)
        predictions = model.predict(features).tolist()
        return PredictResponse(predictions=predictions)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
def metrics():
    return {{"requests": 0, "latency_ms": 0}}
'''.format(model_path=model_path)

        return app_code


# BentoML style packaging
class ModelPackager:
    """Package model for deployment"""
    def __init__(self, model, model_name: str):
        self.model = model
        self.model_name = model_name
        self.metadata = {}

    def add_metadata(self, key: str, value: Any):
        self.metadata[key] = value
        return self

    def save(self, path: str):
        """Save packaged model"""
        import os
        import json
        import pickle

        os.makedirs(path, exist_ok=True)

        # Save model
        with open(os.path.join(path, 'model.pkl'), 'wb') as f:
            pickle.dump(self.model, f)

        # Save metadata
        with open(os.path.join(path, 'metadata.json'), 'w') as f:
            json.dump({
                'name': self.model_name,
                'metadata': self.metadata
            }, f)

        print(f"Model saved to {path}")

Deployment Strategies

PYTHON
def deployment_strategies():
    """Model deployment strategies"""
    print("\nDEPLOYMENT STRATEGIES")
    print("=" * 60)

    print("""
Deployment Strategies:

1. Rolling Deployment:
   - Gradual replacement
   - Zero downtime
   - Easy rollback

2. Blue-Green Deployment:
   - Two identical environments
   - Instant switch
   - Full rollback capability

3. Canary Deployment:
   - Small traffic percentage
   - Gradual increase
   - Risk mitigation

4. Shadow Deployment:
   - Mirror production traffic
   - Compare responses
   - No user impact

5. A/B Testing:
   - Split traffic
   - Compare models
   - Statistical validation

Best Practices:
  - Always have rollback plan
  - Monitor closely after deploy
  - Start with small traffic
  - Automate deployment process
""")

deployment_strategies()


class DeploymentManager:
    """Manage model deployments"""
    def __init__(self):
        self.deployments: Dict[str, Dict] = {}
        self.traffic_split: Dict[str, float] = {}

    def deploy(self, model_name: str, version: str,
               config: Dict) -> str:
        """Deploy model version"""
        deployment_id = f"{model_name}-{version}"

        self.deployments[deployment_id] = {
            'model': model_name,
            'version': version,
            'config': config,
            'status': 'deploying',
            'deployed_at': time.time()
        }

        # Simulate deployment
        self.deployments[deployment_id]['status'] = 'running'
        return deployment_id

    def set_traffic(self, splits: Dict[str, float]):
        """Set traffic split between deployments"""
        if abs(sum(splits.values()) - 1.0) > 0.01:
            raise ValueError("Traffic split must sum to 1.0")
        self.traffic_split = splits

    def route_request(self) -> str:
        """Route request to deployment based on traffic split"""
        import random
        rand = random.random()
        cumulative = 0

        for deployment_id, weight in self.traffic_split.items():
            cumulative += weight
            if rand < cumulative:
                return deployment_id

        return list(self.traffic_split.keys())[-1]

    def rollback(self, model_name: str, to_version: str):
        """Rollback to previous version"""
        target = f"{model_name}-{to_version}"
        if target not in self.deployments:
            raise ValueError(f"Version {to_version} not found")

        # Switch all traffic
        self.traffic_split = {target: 1.0}
        print(f"Rolled back to {to_version}")


class CanaryDeployment:
    """Canary deployment manager"""
    def __init__(self, production_id: str, canary_id: str):
        self.production = production_id
        self.canary = canary_id
        self.canary_weight = 0.0
        self.metrics = {'production': [], 'canary': []}

    def start_canary(self, initial_weight: float = 0.05):
        """Start canary with initial traffic"""
        self.canary_weight = initial_weight
        print(f"Started canary with {initial_weight*100}% traffic")

    def increment_canary(self, increment: float = 0.1):
        """Increase canary traffic"""
        self.canary_weight = min(1.0, self.canary_weight + increment)
        print(f"Canary now at {self.canary_weight*100}% traffic")

    def record_metrics(self, deployment: str, latency: float,
                       success: bool):
        """Record deployment metrics"""
        self.metrics[deployment].append({
            'latency': latency,
            'success': success
        })

    def evaluate(self) -> Dict:
        """Evaluate canary performance"""
        prod_metrics = self.metrics['production']
        canary_metrics = self.metrics['canary']

        if not canary_metrics:
            return {'status': 'insufficient_data'}

        prod_success = sum(1 for m in prod_metrics if m['success']) / len(prod_metrics)
        canary_success = sum(1 for m in canary_metrics if m['success']) / len(canary_metrics)

        prod_latency = sum(m['latency'] for m in prod_metrics) / len(prod_metrics)
        canary_latency = sum(m['latency'] for m in canary_metrics) / len(canary_metrics)

        # Decision logic
        latency_ok = canary_latency <= prod_latency * 1.1
        success_ok = canary_success >= prod_success * 0.95

        return {
            'production': {'success_rate': prod_success, 'latency': prod_latency},
            'canary': {'success_rate': canary_success, 'latency': canary_latency},
            'promote': latency_ok and success_ok
        }

    def promote_or_rollback(self):
        """Decide whether to promote or rollback"""
        evaluation = self.evaluate()

        if evaluation.get('promote'):
            self.canary_weight = 1.0
            print("Canary promoted to production")
        else:
            self.canary_weight = 0.0
            print("Canary rolled back")

Scaling and Performance

PYTHON
def scaling_performance():
    """Model scaling and performance"""
    print("\nSCALING AND PERFORMANCE")
    print("=" * 60)

    print("""
Scaling Strategies:

1. Horizontal Scaling:
   - Add more instances
   - Load balancing
   - Stateless services

2. Vertical Scaling:
   - Bigger machines
   - More GPU memory
   - Limited ceiling

3. Auto-scaling:
   - Based on metrics
   - Cost optimization
   - Handle spikes

Performance Optimization:

1. Model Optimization:
   - Quantization
   - Pruning
   - Distillation

2. Batching:
   - Dynamic batching
   - Batch size tuning

3. Caching:
   - Prediction caching
   - Feature caching

4. Hardware:
   - GPU acceleration
   - TPUs
   - Specialized chips

5. Async Processing:
   - Non-blocking I/O
   - Queue-based
""")

scaling_performance()


class LoadBalancer:
    """Simple load balancer"""
    def __init__(self, servers: List[str]):
        self.servers = servers
        self.current = 0
        self.health_status = {s: True for s in servers}

    def get_server(self) -> Optional[str]:
        """Get next healthy server (round-robin)"""
        healthy = [s for s in self.servers if self.health_status[s]]
        if not healthy:
            return None

        self.current = (self.current + 1) % len(healthy)
        return healthy[self.current]

    def mark_unhealthy(self, server: str):
        self.health_status[server] = False

    def mark_healthy(self, server: str):
        self.health_status[server] = True


class AutoScaler:
    """Simple auto-scaler"""
    def __init__(self, min_replicas: int = 1, max_replicas: int = 10):
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.current_replicas = min_replicas
        self.target_utilization = 0.7

    def check_scale(self, current_utilization: float) -> int:
        """Determine scaling action"""
        if current_utilization > self.target_utilization + 0.1:
            # Scale up
            new_replicas = min(
                self.max_replicas,
                self.current_replicas + 1
            )
        elif current_utilization < self.target_utilization - 0.2:
            # Scale down
            new_replicas = max(
                self.min_replicas,
                self.current_replicas - 1
            )
        else:
            new_replicas = self.current_replicas

        return new_replicas

    def scale(self, target_replicas: int):
        """Execute scaling"""
        if target_replicas != self.current_replicas:
            print(f"Scaling from {self.current_replicas} to {target_replicas}")
            self.current_replicas = target_replicas

Summary

| Aspect | Options | Trade-offs | |--------|---------|------------| | Serving Pattern | Batch/Online/Stream | Latency vs Throughput | | Infrastructure | Container/Serverless/Edge | Cost vs Control | | Deployment | Rolling/Canary/Blue-Green | Risk vs Speed | | Scaling | Horizontal/Vertical/Auto | Cost vs Performance |

Key takeaways:

  • Choose serving pattern based on latency requirements
  • Containerize for reproducibility and portability
  • Use gradual deployment strategies to reduce risk
  • Implement auto-scaling for cost efficiency
  • Monitor performance and health continuously
  • Have rollback plans ready

26.4 Model Monitoring and Observability Advanced

Model Monitoring and Observability

Production ML systems require comprehensive monitoring to ensure models perform as expected. This section covers performance monitoring, drift detection, and alerting strategies.

ML Monitoring Fundamentals

PYTHON
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import numpy as np

def monitoring_overview():
    """ML monitoring fundamentals"""
    print("ML MONITORING")
    print("=" * 60)

    print("""
Why Monitor ML Models?

Model Degradation Sources:
  - Data drift: Input distribution changes
  - Concept drift: Relationship changes
  - Technical issues: Latency, errors
  - Feature pipeline failures

What to Monitor:

1. Model Performance:
   - Prediction accuracy
   - Business metrics
   - A/B test results

2. Data Quality:
   - Input validation
   - Feature statistics
   - Missing values

3. Operational Health:
   - Latency
   - Throughput
   - Error rates

4. Resource Usage:
   - CPU/Memory
   - GPU utilization
   - Cost

Monitoring vs Observability:
  Monitoring: Is something wrong?
  Observability: Why is something wrong?
""")

monitoring_overview()


class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"


@dataclass
class Metric:
    """Metric definition"""
    name: str
    type: MetricType
    value: float
    labels: Dict[str, str]
    timestamp: datetime


class MetricsCollector:
    """Collect and store metrics"""
    def __init__(self):
        self.metrics: List[Metric] = []
        self.counters: Dict[str, float] = {}
        self.gauges: Dict[str, float] = {}
        self.histograms: Dict[str, List[float]] = {}

    def increment(self, name: str, value: float = 1,
                  labels: Dict = None):
        """Increment counter"""
        key = self._make_key(name, labels)
        self.counters[key] = self.counters.get(key, 0) + value
        self._record(name, MetricType.COUNTER,
                    self.counters[key], labels)

    def set_gauge(self, name: str, value: float,
                  labels: Dict = None):
        """Set gauge value"""
        key = self._make_key(name, labels)
        self.gauges[key] = value
        self._record(name, MetricType.GAUGE, value, labels)

    def observe(self, name: str, value: float,
                labels: Dict = None):
        """Observe histogram value"""
        key = self._make_key(name, labels)
        if key not in self.histograms:
            self.histograms[key] = []
        self.histograms[key].append(value)
        self._record(name, MetricType.HISTOGRAM, value, labels)

    def _make_key(self, name: str, labels: Dict) -> str:
        if not labels:
            return name
        label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items()))
        return f"{name}{{{label_str}}}"

    def _record(self, name: str, metric_type: MetricType,
                value: float, labels: Dict):
        self.metrics.append(Metric(
            name=name,
            type=metric_type,
            value=value,
            labels=labels or {},
            timestamp=datetime.now()
        ))

    def get_histogram_stats(self, name: str, labels: Dict = None) -> Dict:
        """Get histogram statistics"""
        key = self._make_key(name, labels)
        values = self.histograms.get(key, [])
        if not values:
            return {}

        return {
            'count': len(values),
            'mean': np.mean(values),
            'std': np.std(values),
            'min': np.min(values),
            'max': np.max(values),
            'p50': np.percentile(values, 50),
            'p95': np.percentile(values, 95),
            'p99': np.percentile(values, 99)
        }

Performance Monitoring

PYTHON
from typing import Dict, List, Optional
from dataclasses import dataclass
import time

def performance_monitoring():
    """Model performance monitoring"""
    print("\nPERFORMANCE MONITORING")
    print("=" * 60)

    print("""
Performance Metrics:

1. Accuracy Metrics:
   - Accuracy, Precision, Recall
   - F1 Score, AUC-ROC
   - Custom business metrics

2. Latency Metrics:
   - P50, P95, P99 latency
   - Prediction time
   - End-to-end latency

3. Throughput Metrics:
   - Requests per second
   - Batch processing rate
   - Queue depth

4. Error Metrics:
   - Error rate
   - Timeout rate
   - Failure reasons

Challenges:

Ground Truth Delay:
  - Labels arrive late
  - Proxy metrics needed
  - Sampling strategies

Business Metrics:
  - Click-through rate
  - Conversion rate
  - Revenue impact
""")

performance_monitoring()


class ModelMonitor:
    """Monitor model performance"""
    def __init__(self, model_name: str, metrics_collector: MetricsCollector):
        self.model_name = model_name
        self.metrics = metrics_collector
        self.predictions = []
        self.labels = []

    def record_prediction(self, prediction: float,
                         features: Dict,
                         latency_ms: float):
        """Record a prediction"""
        self.predictions.append({
            'prediction': prediction,
            'features': features,
            'timestamp': datetime.now()
        })

        # Record metrics
        labels = {'model': self.model_name}
        self.metrics.increment('predictions_total', labels=labels)
        self.metrics.observe('prediction_latency_ms', latency_ms, labels=labels)
        self.metrics.observe('prediction_value', prediction, labels=labels)

    def record_label(self, prediction_id: str, label: float):
        """Record ground truth label"""
        self.labels.append({
            'prediction_id': prediction_id,
            'label': label,
            'timestamp': datetime.now()
        })

    def compute_metrics(self, window_size: int = 1000) -> Dict:
        """Compute performance metrics"""
        if len(self.predictions) < window_size:
            return {'error': 'Insufficient data'}

        recent = self.predictions[-window_size:]

        # Basic statistics
        preds = [p['prediction'] for p in recent]
        return {
            'mean_prediction': np.mean(preds),
            'std_prediction': np.std(preds),
            'prediction_count': len(preds)
        }


class SLAMonitor:
    """Monitor Service Level Agreements"""
    def __init__(self, sla_config: Dict):
        self.sla_config = sla_config
        self.violations = []

    def check_latency(self, latency_ms: float) -> bool:
        """Check latency SLA"""
        max_latency = self.sla_config.get('max_latency_ms', 100)
        passed = latency_ms <= max_latency

        if not passed:
            self.violations.append({
                'type': 'latency',
                'value': latency_ms,
                'threshold': max_latency,
                'timestamp': datetime.now()
            })

        return passed

    def check_error_rate(self, errors: int, total: int) -> bool:
        """Check error rate SLA"""
        max_error_rate = self.sla_config.get('max_error_rate', 0.01)
        error_rate = errors / max(1, total)
        passed = error_rate <= max_error_rate

        if not passed:
            self.violations.append({
                'type': 'error_rate',
                'value': error_rate,
                'threshold': max_error_rate,
                'timestamp': datetime.now()
            })

        return passed

    def get_sla_report(self) -> Dict:
        """Get SLA compliance report"""
        return {
            'total_violations': len(self.violations),
            'violations_by_type': self._group_violations(),
            'sla_config': self.sla_config
        }

    def _group_violations(self) -> Dict:
        from collections import Counter
        return dict(Counter(v['type'] for v in self.violations))

Drift Detection

PYTHON
from typing import Dict, List, Optional
from dataclasses import dataclass
import numpy as np

def drift_detection():
    """Data and model drift detection"""
    print("\nDRIFT DETECTION")
    print("=" * 60)

    print("""
Types of Drift:

1. Data Drift (Covariate Shift):
   - Input distribution changes
   - P(X) changes, P(Y|X) same
   - Example: User demographics shift

2. Concept Drift:
   - Relationship changes
   - P(Y|X) changes
   - Example: Buying patterns change

3. Label Drift:
   - Target distribution changes
   - P(Y) changes
   - Example: Fraud rate increases

Detection Methods:

Statistical Tests:
  - Kolmogorov-Smirnov test
  - Chi-squared test
  - Population Stability Index (PSI)

Distance Metrics:
  - Jensen-Shannon divergence
  - Wasserstein distance
  - Maximum Mean Discrepancy

ML-Based:
  - Train classifier to distinguish
  - If distinguishable → drift
""")

drift_detection()


class DriftDetector:
    """Detect distribution drift"""
    def __init__(self, reference_data: np.ndarray,
                 threshold: float = 0.1):
        self.reference = reference_data
        self.threshold = threshold
        self.reference_stats = self._compute_stats(reference_data)

    def _compute_stats(self, data: np.ndarray) -> Dict:
        """Compute distribution statistics"""
        return {
            'mean': np.mean(data, axis=0),
            'std': np.std(data, axis=0),
            'min': np.min(data, axis=0),
            'max': np.max(data, axis=0),
            'percentiles': {
                'p25': np.percentile(data, 25, axis=0),
                'p50': np.percentile(data, 50, axis=0),
                'p75': np.percentile(data, 75, axis=0)
            }
        }

    def detect_psi(self, current_data: np.ndarray,
                   num_bins: int = 10) -> Dict:
        """Population Stability Index"""
        results = {}

        for col in range(current_data.shape[1]):
            ref_col = self.reference[:, col]
            curr_col = current_data[:, col]

            # Create bins from reference
            bins = np.percentile(ref_col, np.linspace(0, 100, num_bins + 1))
            bins[0] = -np.inf
            bins[-1] = np.inf

            # Calculate distributions
            ref_dist = np.histogram(ref_col, bins=bins)[0] / len(ref_col)
            curr_dist = np.histogram(curr_col, bins=bins)[0] / len(curr_col)

            # Avoid division by zero
            ref_dist = np.clip(ref_dist, 1e-10, 1)
            curr_dist = np.clip(curr_dist, 1e-10, 1)

            # Calculate PSI
            psi = np.sum((curr_dist - ref_dist) * np.log(curr_dist / ref_dist))

            results[f'column_{col}'] = {
                'psi': psi,
                'drift_detected': psi > self.threshold
            }

        return results

    def detect_ks(self, current_data: np.ndarray) -> Dict:
        """Kolmogorov-Smirnov test"""
        from scipy import stats

        results = {}
        for col in range(current_data.shape[1]):
            ref_col = self.reference[:, col]
            curr_col = current_data[:, col]

            statistic, p_value = stats.ks_2samp(ref_col, curr_col)

            results[f'column_{col}'] = {
                'ks_statistic': statistic,
                'p_value': p_value,
                'drift_detected': p_value < 0.05
            }

        return results


class ConceptDriftDetector:
    """Detect concept drift using prediction analysis"""
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.prediction_history = []
        self.label_history = []
        self.accuracy_history = []

    def update(self, predictions: List[float], labels: List[float]):
        """Update with new batch"""
        self.prediction_history.extend(predictions)
        self.label_history.extend(labels)

        # Compute accuracy for window
        if len(self.label_history) >= self.window_size:
            recent_preds = self.prediction_history[-self.window_size:]
            recent_labels = self.label_history[-self.window_size:]

            # Binary accuracy
            accuracy = np.mean(
                (np.array(recent_preds) > 0.5) == (np.array(recent_labels) > 0.5)
            )
            self.accuracy_history.append(accuracy)

    def detect_drift(self) -> Dict:
        """Detect concept drift from accuracy changes"""
        if len(self.accuracy_history) < 10:
            return {'status': 'insufficient_data'}

        recent = self.accuracy_history[-10:]
        baseline = self.accuracy_history[:10]

        recent_mean = np.mean(recent)
        baseline_mean = np.mean(baseline)

        degradation = baseline_mean - recent_mean

        return {
            'baseline_accuracy': baseline_mean,
            'recent_accuracy': recent_mean,
            'degradation': degradation,
            'drift_detected': degradation > 0.05
        }

Alerting and Incident Response

PYTHON
from typing import Dict, List, Callable
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

def alerting():
    """ML alerting strategies"""
    print("\nALERTING")
    print("=" * 60)

    print("""
Alert Types:

1. Threshold Alerts:
   - Static thresholds
   - Simple to implement
   - May need tuning

2. Anomaly Alerts:
   - Statistical detection
   - Adapts to patterns
   - Fewer false positives

3. Trend Alerts:
   - Detect gradual changes
   - Early warning
   - Predictive

Alert Severity:
  - Critical: Immediate action
  - Warning: Investigate soon
  - Info: Awareness

Best Practices:
  - Avoid alert fatigue
  - Include context
  - Runbooks for response
  - Escalation paths
""")

alerting()


class AlertSeverity(Enum):
    CRITICAL = "critical"
    WARNING = "warning"
    INFO = "info"


@dataclass
class Alert:
    """Alert definition"""
    name: str
    severity: AlertSeverity
    condition: str
    value: float
    threshold: float
    timestamp: datetime
    metadata: Dict


class AlertManager:
    """Manage ML alerts"""
    def __init__(self):
        self.rules: Dict[str, Dict] = {}
        self.alerts: List[Alert] = []
        self.handlers: Dict[AlertSeverity, List[Callable]] = {
            AlertSeverity.CRITICAL: [],
            AlertSeverity.WARNING: [],
            AlertSeverity.INFO: []
        }

    def add_rule(self, name: str, condition: Callable,
                 severity: AlertSeverity, threshold: float):
        """Add alert rule"""
        self.rules[name] = {
            'condition': condition,
            'severity': severity,
            'threshold': threshold
        }

    def add_handler(self, severity: AlertSeverity,
                    handler: Callable):
        """Add alert handler"""
        self.handlers[severity].append(handler)

    def check(self, metrics: Dict) -> List[Alert]:
        """Check all rules against metrics"""
        triggered = []

        for name, rule in self.rules.items():
            value = rule['condition'](metrics)

            if value > rule['threshold']:
                alert = Alert(
                    name=name,
                    severity=rule['severity'],
                    condition=str(rule['condition']),
                    value=value,
                    threshold=rule['threshold'],
                    timestamp=datetime.now(),
                    metadata=metrics
                )
                triggered.append(alert)
                self.alerts.append(alert)
                self._handle_alert(alert)

        return triggered

    def _handle_alert(self, alert: Alert):
        """Execute alert handlers"""
        for handler in self.handlers[alert.severity]:
            try:
                handler(alert)
            except Exception as e:
                print(f"Handler failed: {e}")


# Alert handlers
def slack_handler(alert: Alert):
    """Send alert to Slack"""
    print(f"[SLACK] {alert.severity.value}: {alert.name}")

def pagerduty_handler(alert: Alert):
    """Send critical alerts to PagerDuty"""
    if alert.severity == AlertSeverity.CRITICAL:
        print(f"[PAGERDUTY] Critical: {alert.name}")

def log_handler(alert: Alert):
    """Log alert"""
    print(f"[LOG] {alert.timestamp} - {alert.name}: {alert.value}")


# Example setup
def alerting_example():
    """Example alerting setup"""
    manager = AlertManager()

    # Add rules
    manager.add_rule(
        'high_latency',
        lambda m: m.get('p99_latency_ms', 0),
        AlertSeverity.WARNING,
        threshold=100
    )

    manager.add_rule(
        'high_error_rate',
        lambda m: m.get('error_rate', 0),
        AlertSeverity.CRITICAL,
        threshold=0.05
    )

    manager.add_rule(
        'drift_detected',
        lambda m: m.get('psi_score', 0),
        AlertSeverity.WARNING,
        threshold=0.1
    )

    # Add handlers
    manager.add_handler(AlertSeverity.CRITICAL, pagerduty_handler)
    manager.add_handler(AlertSeverity.WARNING, slack_handler)
    manager.add_handler(AlertSeverity.INFO, log_handler)

    # Check metrics
    metrics = {
        'p99_latency_ms': 150,
        'error_rate': 0.02,
        'psi_score': 0.15
    }

    alerts = manager.check(metrics)
    print(f"Triggered {len(alerts)} alerts")

alerting_example()

Summary

| Aspect | What to Monitor | Key Metrics | |--------|-----------------|-------------| | Performance | Model accuracy | F1, AUC, business KPIs | | Operational | System health | Latency, error rate | | Data Quality | Input distribution | PSI, KS statistic | | Resource | Infrastructure | CPU, memory, cost |

Key takeaways:

  • Monitor model performance continuously
  • Detect drift before it impacts business
  • Use multiple drift detection methods
  • Set up alerts with appropriate severity
  • Have runbooks for incident response
  • Balance sensitivity vs alert fatigue

26.5 CI/CD for Machine Learning Advanced

CI/CD for Machine Learning

Continuous Integration and Continuous Deployment (CI/CD) for ML extends traditional software practices to handle data, models, and experiments. This section covers ML-specific CI/CD pipelines and automation.

ML CI/CD Fundamentals

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum

def ml_cicd_overview():
    """ML CI/CD fundamentals"""
    print("ML CI/CD")
    print("=" * 60)

    print("""
Traditional CI/CD vs ML CI/CD:

Traditional:
  Code → Build → Test → Deploy

ML CI/CD:
  Code → Build → Test → Deploy
  Data → Validate → Transform → Store
  Model → Train → Evaluate → Deploy

ML-Specific Challenges:

1. Data Dependencies:
   - Data versioning
   - Data validation
   - Feature consistency

2. Training Pipelines:
   - Long-running jobs
   - Resource requirements
   - Reproducibility

3. Model Validation:
   - Beyond unit tests
   - Performance gates
   - Fairness checks

4. Deployment:
   - Model serving
   - A/B testing
   - Rollback capability

CI/CD Tools for ML:
  - GitHub Actions
  - GitLab CI
  - Jenkins
  - Kubeflow Pipelines
  - MLflow Projects
""")

ml_cicd_overview()


class PipelineStage(Enum):
    VALIDATE = "validate"
    BUILD = "build"
    TEST = "test"
    TRAIN = "train"
    EVALUATE = "evaluate"
    DEPLOY = "deploy"


@dataclass
class PipelineStep:
    """CI/CD pipeline step"""
    name: str
    stage: PipelineStage
    script: List[str]
    dependencies: List[str] = None
    artifacts: List[str] = None
    when: str = "always"  # always, on_success, manual


class MLPipelineConfig:
    """ML CI/CD pipeline configuration"""
    def __init__(self, name: str):
        self.name = name
        self.steps: List[PipelineStep] = []
        self.variables: Dict[str, str] = {}

    def add_step(self, step: PipelineStep):
        self.steps.append(step)

    def set_variable(self, key: str, value: str):
        self.variables[key] = value

    def to_github_actions(self) -> str:
        """Generate GitHub Actions workflow"""
        workflow = f"""name: {self.name}

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

env:
"""
        for key, value in self.variables.items():
            workflow += f"  {key}: {value}\n"

        workflow += "\njobs:\n"

        for step in self.steps:
            workflow += f"""
  {step.name}:
    runs-on: ubuntu-latest
"""
            if step.dependencies:
                workflow += f"    needs: [{', '.join(step.dependencies)}]\n"

            workflow += "    steps:\n"
            workflow += "      - uses: actions/checkout@v3\n"
            workflow += "      - uses: actions/setup-python@v4\n"
            workflow += "        with:\n"
            workflow += "          python-version: '3.9'\n"

            for script in step.script:
                workflow += f"      - run: {script}\n"

        return workflow

Data Pipeline CI/CD

PYTHON
def data_pipeline_cicd():
    """CI/CD for data pipelines"""
    print("\nDATA PIPELINE CI/CD")
    print("=" * 60)

    print("""
Data Pipeline Stages:

1. Data Validation:
   - Schema checks
   - Quality checks
   - Anomaly detection

2. Data Processing:
   - ETL jobs
   - Feature engineering
   - Data transformation

3. Data Testing:
   - Data unit tests
   - Integration tests
   - Quality assertions

CI/CD Steps:

1. On Pull Request:
   - Lint SQL/Python
   - Run unit tests
   - Validate schemas

2. On Merge:
   - Run full pipeline
   - Validate outputs
   - Update data catalog

3. Scheduled:
   - Incremental updates
   - Quality monitoring
   - Drift detection
""")

data_pipeline_cicd()


class DataPipelineCI:
    """CI/CD for data pipelines"""
    def __init__(self):
        self.validators = []
        self.transformers = []
        self.tests = []

    def add_validator(self, validator):
        self.validators.append(validator)

    def add_transformer(self, transformer):
        self.transformers.append(transformer)

    def add_test(self, test):
        self.tests.append(test)

    def run_validation(self, data) -> Dict:
        """Run all validators"""
        results = {'passed': True, 'failures': []}

        for validator in self.validators:
            result = validator(data)
            if not result['passed']:
                results['passed'] = False
                results['failures'].append(result)

        return results

    def run_tests(self, data) -> Dict:
        """Run all data tests"""
        results = {'total': len(self.tests), 'passed': 0, 'failed': []}

        for test in self.tests:
            try:
                test(data)
                results['passed'] += 1
            except AssertionError as e:
                results['failed'].append({
                    'test': test.__name__,
                    'error': str(e)
                })

        return results


# Example data tests
def test_no_missing_values(data):
    """Test for missing values"""
    missing = data.isnull().sum().sum()
    assert missing == 0, f"Found {missing} missing values"

def test_valid_ranges(data):
    """Test value ranges"""
    assert data['age'].min() >= 0, "Negative age found"
    assert data['age'].max() <= 120, "Age exceeds maximum"

def test_unique_ids(data):
    """Test ID uniqueness"""
    assert data['id'].is_unique, "Duplicate IDs found"


# GitHub Actions workflow for data
def data_github_workflow():
    """Generate data pipeline workflow"""
    return """
name: Data Pipeline

on:
  push:
    paths:
      - 'data/**'
      - 'features/**'
  schedule:
    - cron: '0 0 * * *'  # Daily

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
      - name: Install deps
        run: pip install -r requirements.txt
      - name: Validate schema
        run: python scripts/validate_schema.py
      - name: Run data tests
        run: pytest tests/data/

  process:
    needs: validate
    runs-on: ubuntu-latest
    steps:
      - name: Run ETL
        run: python scripts/run_etl.py
      - name: Validate output
        run: python scripts/validate_output.py
      - name: Upload artifacts
        uses: actions/upload-artifact@v3
        with:
          name: processed-data
          path: data/processed/
"""

Model Training CI/CD

PYTHON
def training_cicd():
    """CI/CD for model training"""
    print("\nMODEL TRAINING CI/CD")
    print("=" * 60)

    print("""
Training Pipeline Stages:

1. Trigger:
   - New data available
   - Code changes
   - Scheduled retraining

2. Training:
   - Data preparation
   - Model training
   - Hyperparameter tuning

3. Evaluation:
   - Performance metrics
   - Comparison to baseline
   - Quality gates

4. Registration:
   - Save to model registry
   - Version tagging
   - Metadata logging

Automation Patterns:

1. Continuous Training:
   - Automatic retraining
   - Performance monitoring
   - Drift-triggered

2. Gated Deployment:
   - Performance thresholds
   - Manual approval
   - Staged rollout
""")

training_cicd()


class TrainingPipeline:
    """ML training CI/CD pipeline"""
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.baseline_metrics = None
        self.quality_gates = {}

    def set_baseline(self, metrics: Dict[str, float]):
        """Set baseline metrics for comparison"""
        self.baseline_metrics = metrics

    def add_quality_gate(self, metric: str, threshold: float,
                         comparison: str = "gte"):
        """Add quality gate"""
        self.quality_gates[metric] = {
            'threshold': threshold,
            'comparison': comparison
        }

    def evaluate_gates(self, metrics: Dict[str, float]) -> Dict:
        """Evaluate quality gates"""
        results = {'passed': True, 'gates': {}}

        for metric, gate in self.quality_gates.items():
            value = metrics.get(metric)
            if value is None:
                results['gates'][metric] = {'passed': False, 'error': 'Missing'}
                results['passed'] = False
                continue

            if gate['comparison'] == 'gte':
                passed = value >= gate['threshold']
            elif gate['comparison'] == 'lte':
                passed = value <= gate['threshold']
            else:
                passed = value == gate['threshold']

            results['gates'][metric] = {
                'passed': passed,
                'value': value,
                'threshold': gate['threshold']
            }

            if not passed:
                results['passed'] = False

        return results

    def compare_to_baseline(self, metrics: Dict[str, float]) -> Dict:
        """Compare to baseline metrics"""
        if not self.baseline_metrics:
            return {'status': 'no_baseline'}

        comparison = {}
        for metric, value in metrics.items():
            baseline = self.baseline_metrics.get(metric)
            if baseline:
                comparison[metric] = {
                    'current': value,
                    'baseline': baseline,
                    'improvement': value - baseline,
                    'improved': value > baseline
                }

        return comparison


# Training workflow
def training_github_workflow():
    """Generate training workflow"""
    return """
name: Model Training

on:
  workflow_dispatch:
    inputs:
      retrain:
        description: 'Force retrain'
        required: false
        default: 'false'
  schedule:
    - cron: '0 2 * * 0'  # Weekly

jobs:
  prepare:
    runs-on: ubuntu-latest
    outputs:
      should_train: ${{ steps.check.outputs.train }}
    steps:
      - name: Check for new data
        id: check
        run: |
          # Check if new data available
          echo "train=true" >> $GITHUB_OUTPUT

  train:
    needs: prepare
    if: needs.prepare.outputs.should_train == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
      - name: Install deps
        run: pip install -r requirements.txt
      - name: Train model
        run: python scripts/train.py
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
      - name: Evaluate model
        run: python scripts/evaluate.py
      - name: Check quality gates
        run: python scripts/check_gates.py

  register:
    needs: train
    runs-on: ubuntu-latest
    steps:
      - name: Register model
        run: python scripts/register_model.py
      - name: Update model registry
        run: python scripts/update_registry.py
"""

Deployment Automation

PYTHON
def deployment_automation():
    """Automated model deployment"""
    print("\nDEPLOYMENT AUTOMATION")
    print("=" * 60)

    print("""
Deployment Stages:

1. Build:
   - Package model
   - Build container
   - Run tests

2. Staging:
   - Deploy to staging
   - Integration tests
   - Shadow mode

3. Production:
   - Canary deployment
   - Gradual rollout
   - Monitoring

Deployment Triggers:

1. Automatic:
   - New model passes gates
   - Scheduled promotion
   - Drift detected

2. Manual:
   - Approval required
   - Business validation
   - Rollback

Infrastructure as Code:
  - Terraform
  - Kubernetes manifests
  - Helm charts
""")

deployment_automation()


class DeploymentPipeline:
    """Model deployment pipeline"""
    def __init__(self):
        self.environments = ['staging', 'production']
        self.approvals = {}
        self.deployments = {}

    def build_container(self, model_path: str, tag: str) -> str:
        """Build model container"""
        # Simulated container build
        image = f"model-server:{tag}"
        print(f"Building container: {image}")
        return image

    def deploy_to_staging(self, image: str) -> Dict:
        """Deploy to staging environment"""
        self.deployments['staging'] = {
            'image': image,
            'status': 'deployed',
            'timestamp': 'now'
        }
        return self.deployments['staging']

    def run_integration_tests(self) -> bool:
        """Run integration tests"""
        # Simulated tests
        print("Running integration tests...")
        return True

    def request_approval(self, environment: str) -> str:
        """Request deployment approval"""
        approval_id = f"approval-{environment}-001"
        self.approvals[approval_id] = {
            'environment': environment,
            'status': 'pending'
        }
        return approval_id

    def approve(self, approval_id: str):
        """Approve deployment"""
        if approval_id in self.approvals:
            self.approvals[approval_id]['status'] = 'approved'

    def deploy_to_production(self, image: str,
                             strategy: str = 'canary') -> Dict:
        """Deploy to production"""
        if strategy == 'canary':
            return self._canary_deploy(image)
        else:
            return self._rolling_deploy(image)

    def _canary_deploy(self, image: str) -> Dict:
        """Canary deployment"""
        return {
            'strategy': 'canary',
            'image': image,
            'traffic_split': {'canary': 0.1, 'stable': 0.9}
        }

    def _rolling_deploy(self, image: str) -> Dict:
        """Rolling deployment"""
        return {
            'strategy': 'rolling',
            'image': image,
            'replicas_updated': 0
        }


# Deployment workflow
def deployment_github_workflow():
    """Generate deployment workflow"""
    return """
name: Model Deployment

on:
  workflow_run:
    workflows: ["Model Training"]
    types:
      - completed
  workflow_dispatch:
    inputs:
      version:
        description: 'Model version to deploy'
        required: true

jobs:
  build:
    runs-on: ubuntu-latest
    outputs:
      image: ${{ steps.build.outputs.image }}
    steps:
      - uses: actions/checkout@v3
      - name: Build container
        id: build
        run: |
          docker build -t model-server:${{ github.sha }} .
          echo "image=model-server:${{ github.sha }}" >> $GITHUB_OUTPUT
      - name: Push to registry
        run: docker push model-server:${{ github.sha }}

  staging:
    needs: build
    runs-on: ubuntu-latest
    environment: staging
    steps:
      - name: Deploy to staging
        run: kubectl apply -f k8s/staging/
      - name: Run integration tests
        run: pytest tests/integration/
      - name: Run load tests
        run: locust -f tests/load/locustfile.py

  production:
    needs: staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Deploy canary
        run: kubectl apply -f k8s/production/canary.yaml
      - name: Monitor canary
        run: python scripts/monitor_canary.py --duration 30m
      - name: Promote or rollback
        run: python scripts/promote_or_rollback.py
"""

Testing Strategies

PYTHON
def ml_testing():
    """ML testing strategies"""
    print("\nML TESTING STRATEGIES")
    print("=" * 60)

    print("""
Testing Levels:

1. Unit Tests:
   - Data transformations
   - Feature functions
   - Model utilities

2. Integration Tests:
   - Pipeline execution
   - API endpoints
   - Database interactions

3. Model Tests:
   - Prediction accuracy
   - Performance benchmarks
   - Edge cases

4. System Tests:
   - End-to-end workflows
   - Load testing
   - Chaos testing

ML-Specific Tests:

1. Data Tests:
   - Schema validation
   - Distribution checks
   - Completeness

2. Model Tests:
   - Accuracy thresholds
   - Latency requirements
   - Memory limits

3. Fairness Tests:
   - Bias detection
   - Demographic parity
   - Equal opportunity
""")

ml_testing()


class MLTestSuite:
    """ML-specific test suite"""
    def __init__(self):
        self.tests = []
        self.results = []

    def add_data_test(self, name: str, test_func):
        self.tests.append({'type': 'data', 'name': name, 'func': test_func})

    def add_model_test(self, name: str, test_func):
        self.tests.append({'type': 'model', 'name': name, 'func': test_func})

    def add_performance_test(self, name: str, test_func):
        self.tests.append({'type': 'performance', 'name': name, 'func': test_func})

    def run(self, context: Dict) -> Dict:
        """Run all tests"""
        results = {'passed': 0, 'failed': 0, 'errors': []}

        for test in self.tests:
            try:
                test['func'](context)
                results['passed'] += 1
                self.results.append({
                    'name': test['name'],
                    'status': 'passed'
                })
            except AssertionError as e:
                results['failed'] += 1
                results['errors'].append({
                    'name': test['name'],
                    'error': str(e)
                })
                self.results.append({
                    'name': test['name'],
                    'status': 'failed',
                    'error': str(e)
                })

        return results


# Example tests
def test_model_accuracy(context):
    """Test model meets accuracy threshold"""
    accuracy = context['metrics']['accuracy']
    assert accuracy >= 0.85, f"Accuracy {accuracy} below threshold 0.85"

def test_prediction_latency(context):
    """Test prediction latency"""
    latency = context['metrics']['p99_latency_ms']
    assert latency <= 100, f"Latency {latency}ms exceeds 100ms threshold"

def test_model_size(context):
    """Test model size for deployment"""
    size_mb = context['model_size_mb']
    assert size_mb <= 500, f"Model size {size_mb}MB exceeds limit"

def test_fairness(context):
    """Test model fairness"""
    disparity = context['metrics'].get('demographic_parity', 0)
    assert disparity <= 0.1, f"Demographic disparity {disparity} too high"

Summary

| Stage | What to Test | Automation | |-------|--------------|------------| | Data | Schema, quality | Daily/On change | | Training | Metrics, gates | On trigger | | Deployment | Integration, load | On promotion | | Production | Monitoring, drift | Continuous |

Key takeaways:

  • Extend CI/CD practices to data and models
  • Use quality gates to prevent bad models
  • Automate testing at all levels
  • Implement gradual deployment strategies
  • Monitor continuously after deployment
  • Have clear rollback procedures