Introduction to MLOps
MLOps (Machine Learning Operations) bridges the gap between ML development and production deployment. This section covers MLOps fundamentals, workflows, and the infrastructure needed for reliable ML systems.
MLOps Fundamentals
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
def mlops_overview():
"""MLOps fundamentals"""
print("MLOPS FUNDAMENTALS")
print("=" * 60)
print("""
What is MLOps?
Definition:
A set of practices for reliable and efficient
deployment of ML models to production
MLOps = ML + DevOps + Data Engineering
Why MLOps?
Traditional ML Challenges:
- Models work in notebooks but fail in production
- No reproducibility
- Manual deployment processes
- No monitoring or feedback loops
- Data and model drift undetected
MLOps Benefits:
- Reproducible experiments
- Automated pipelines
- Continuous training/deployment
- Model monitoring
- Faster iteration cycles
MLOps vs DevOps:
DevOps:
Code → Build → Test → Deploy → Monitor
MLOps:
Data → Preprocess → Train → Validate → Deploy → Monitor
↑__________________________________|
""")
mlops_overview()
class MLLifecycleStage(Enum):
DATA_COLLECTION = "data_collection"
DATA_PROCESSING = "data_processing"
FEATURE_ENGINEERING = "feature_engineering"
MODEL_TRAINING = "model_training"
MODEL_EVALUATION = "model_evaluation"
MODEL_DEPLOYMENT = "model_deployment"
MONITORING = "monitoring"
def mlops_components():
"""Core MLOps components"""
print("\nMLOPS COMPONENTS")
print("-" * 50)
print("""
1. Data Management:
- Data versioning
- Data validation
- Feature stores
2. Experiment Tracking:
- Track parameters
- Log metrics
- Compare runs
3. Model Registry:
- Version models
- Stage management
- Metadata storage
4. ML Pipelines:
- Orchestration
- Automation
- Reproducibility
5. Serving Infrastructure:
- Model deployment
- Scaling
- A/B testing
6. Monitoring:
- Performance tracking
- Drift detection
- Alerting
""")
mlops_components()
@dataclass
class MLProject:
"""ML project structure"""
name: str
version: str
data_sources: List[str]
features: List[str]
model_type: str
metrics: Dict[str, float]
artifacts: Dict[str, str]
def to_config(self) -> Dict:
"""Export project config"""
return {
'name': self.name,
'version': self.version,
'data': {
'sources': self.data_sources,
'features': self.features
},
'model': {
'type': self.model_type,
'metrics': self.metrics
},
'artifacts': self.artifacts
}Experiment Tracking
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
import hashlib
def experiment_tracking():
"""Experiment tracking concepts"""
print("\nEXPERIMENT TRACKING")
print("=" * 60)
print("""
Why Track Experiments?
Problems without tracking:
- Which hyperparameters worked best?
- What data was used for that model?
- Why did performance drop?
- Can we reproduce results?
What to Track:
1. Parameters:
- Hyperparameters
- Model architecture
- Data preprocessing
2. Metrics:
- Training loss
- Validation accuracy
- Custom metrics
3. Artifacts:
- Model weights
- Plots and figures
- Data samples
4. Metadata:
- Git commit
- Environment
- Timestamps
Popular Tools:
- MLflow
- Weights & Biases
- Neptune
- Comet ML
""")
experiment_tracking()
class ExperimentTracker:
"""Simple experiment tracking system"""
def __init__(self, project_name: str, storage_path: str = "./mlruns"):
self.project_name = project_name
self.storage_path = storage_path
self.current_run = None
self.runs = []
def start_run(self, run_name: str = None) -> 'Run':
"""Start a new experiment run"""
run_id = self._generate_run_id()
self.current_run = Run(
run_id=run_id,
run_name=run_name or f"run_{run_id}",
project=self.project_name,
start_time=datetime.now()
)
return self.current_run
def end_run(self):
"""End current run"""
if self.current_run:
self.current_run.end_time = datetime.now()
self.runs.append(self.current_run)
self._save_run(self.current_run)
self.current_run = None
def log_params(self, params: Dict[str, Any]):
"""Log parameters"""
if self.current_run:
self.current_run.params.update(params)
def log_metrics(self, metrics: Dict[str, float], step: int = None):
"""Log metrics"""
if self.current_run:
for name, value in metrics.items():
if name not in self.current_run.metrics:
self.current_run.metrics[name] = []
self.current_run.metrics[name].append({
'value': value,
'step': step,
'timestamp': datetime.now().isoformat()
})
def log_artifact(self, name: str, path: str):
"""Log artifact path"""
if self.current_run:
self.current_run.artifacts[name] = path
def _generate_run_id(self) -> str:
timestamp = datetime.now().isoformat()
return hashlib.md5(timestamp.encode()).hexdigest()[:8]
def _save_run(self, run: 'Run'):
"""Save run to storage"""
import os
run_dir = os.path.join(self.storage_path, run.run_id)
os.makedirs(run_dir, exist_ok=True)
with open(os.path.join(run_dir, 'run.json'), 'w') as f:
json.dump(run.to_dict(), f, indent=2)
@dataclass
class Run:
"""Experiment run"""
run_id: str
run_name: str
project: str
start_time: datetime
end_time: datetime = None
params: Dict[str, Any] = None
metrics: Dict[str, List] = None
artifacts: Dict[str, str] = None
tags: Dict[str, str] = None
def __post_init__(self):
self.params = self.params or {}
self.metrics = self.metrics or {}
self.artifacts = self.artifacts or {}
self.tags = self.tags or {}
def to_dict(self) -> Dict:
return {
'run_id': self.run_id,
'run_name': self.run_name,
'project': self.project,
'start_time': self.start_time.isoformat(),
'end_time': self.end_time.isoformat() if self.end_time else None,
'params': self.params,
'metrics': self.metrics,
'artifacts': self.artifacts,
'tags': self.tags
}
# Usage example
def training_with_tracking():
"""Example: Training with experiment tracking"""
tracker = ExperimentTracker("sentiment-analysis")
# Start experiment
run = tracker.start_run("baseline_bert")
# Log hyperparameters
tracker.log_params({
'model': 'bert-base-uncased',
'learning_rate': 2e-5,
'batch_size': 32,
'epochs': 3,
'max_length': 128
})
# Simulate training loop
for epoch in range(3):
# Training...
train_loss = 0.5 - (epoch * 0.1)
val_accuracy = 0.8 + (epoch * 0.05)
tracker.log_metrics({
'train_loss': train_loss,
'val_accuracy': val_accuracy
}, step=epoch)
# Log model artifact
tracker.log_artifact('model', './models/bert_sentiment.pt')
tracker.end_run()
training_with_tracking()Model Registry
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
def model_registry_overview():
"""Model registry concepts"""
print("\nMODEL REGISTRY")
print("=" * 60)
print("""
What is a Model Registry?
Central repository for:
- Storing trained models
- Versioning models
- Managing model lifecycle
- Tracking model lineage
Model Lifecycle Stages:
1. Development: Experimental models
2. Staging: Candidates for production
3. Production: Live serving models
4. Archived: Deprecated models
Key Features:
1. Version Control:
- Semantic versioning
- Immutable versions
- Rollback capability
2. Metadata:
- Training metrics
- Data lineage
- Environment specs
3. Governance:
- Approval workflows
- Access control
- Audit trails
4. Integration:
- CI/CD pipelines
- Serving systems
- Monitoring
""")
model_registry_overview()
class ModelStage(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
@dataclass
class ModelVersion:
"""Model version in registry"""
name: str
version: str
stage: ModelStage
source: str # Path or URI
run_id: str # Link to experiment
metrics: Dict[str, float]
signature: Dict # Input/output schema
created_at: datetime
description: str = ""
tags: Dict[str, str] = None
def __post_init__(self):
self.tags = self.tags or {}
class ModelRegistry:
"""Simple model registry"""
def __init__(self):
self.models: Dict[str, List[ModelVersion]] = {}
def register_model(self, model_version: ModelVersion) -> str:
"""Register a new model version"""
name = model_version.name
if name not in self.models:
self.models[name] = []
# Check version doesn't exist
existing = [m for m in self.models[name]
if m.version == model_version.version]
if existing:
raise ValueError(f"Version {model_version.version} exists")
self.models[name].append(model_version)
return f"{name}/{model_version.version}"
def get_model(self, name: str,
version: str = None,
stage: ModelStage = None) -> Optional[ModelVersion]:
"""Get model by name and version/stage"""
if name not in self.models:
return None
versions = self.models[name]
if version:
matches = [m for m in versions if m.version == version]
return matches[0] if matches else None
if stage:
matches = [m for m in versions if m.stage == stage]
return matches[-1] if matches else None
# Return latest
return versions[-1] if versions else None
def transition_stage(self, name: str, version: str,
new_stage: ModelStage) -> bool:
"""Transition model to new stage"""
model = self.get_model(name, version)
if not model:
return False
# If promoting to production, demote current production
if new_stage == ModelStage.PRODUCTION:
current_prod = self.get_model(name, stage=ModelStage.PRODUCTION)
if current_prod:
current_prod.stage = ModelStage.ARCHIVED
model.stage = new_stage
return True
def list_models(self, name: str = None) -> List[Dict]:
"""List all models or versions of a model"""
if name:
return [m.__dict__ for m in self.models.get(name, [])]
return [
{'name': n, 'versions': len(v)}
for n, v in self.models.items()
]
def compare_versions(self, name: str,
v1: str, v2: str) -> Dict:
"""Compare two model versions"""
m1 = self.get_model(name, v1)
m2 = self.get_model(name, v2)
if not m1 or not m2:
return {}
# Compare metrics
comparison = {}
all_metrics = set(m1.metrics.keys()) | set(m2.metrics.keys())
for metric in all_metrics:
val1 = m1.metrics.get(metric)
val2 = m2.metrics.get(metric)
if val1 and val2:
comparison[metric] = {
'v1': val1,
'v2': val2,
'diff': val2 - val1,
'improved': val2 > val1
}
return comparison
# Example usage
def registry_example():
"""Example model registry usage"""
registry = ModelRegistry()
# Register initial model
v1 = ModelVersion(
name="fraud-detector",
version="1.0.0",
stage=ModelStage.DEVELOPMENT,
source="s3://models/fraud-detector/v1",
run_id="abc123",
metrics={'accuracy': 0.92, 'f1': 0.88, 'auc': 0.95},
signature={
'inputs': {'features': 'float[batch, 50]'},
'outputs': {'probability': 'float[batch, 1]'}
},
created_at=datetime.now()
)
registry.register_model(v1)
# Promote to production
registry.transition_stage("fraud-detector", "1.0.0",
ModelStage.PRODUCTION)
# Register improved version
v2 = ModelVersion(
name="fraud-detector",
version="1.1.0",
stage=ModelStage.STAGING,
source="s3://models/fraud-detector/v2",
run_id="def456",
metrics={'accuracy': 0.94, 'f1': 0.91, 'auc': 0.97},
signature={
'inputs': {'features': 'float[batch, 50]'},
'outputs': {'probability': 'float[batch, 1]'}
},
created_at=datetime.now()
)
registry.register_model(v2)
# Compare versions
comparison = registry.compare_versions("fraud-detector", "1.0.0", "1.1.0")
print("Version comparison:", comparison)
registry_example()ML Pipeline Basics
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from enum import Enum
import time
def ml_pipelines():
"""ML pipeline fundamentals"""
print("\nML PIPELINES")
print("=" * 60)
print("""
What is an ML Pipeline?
Automated workflow that:
- Orchestrates ML stages
- Ensures reproducibility
- Enables CI/CD for ML
Pipeline Components:
1. Data Ingestion:
- Fetch data sources
- Validate data quality
2. Preprocessing:
- Clean and transform
- Feature engineering
3. Training:
- Model training
- Hyperparameter tuning
4. Evaluation:
- Compute metrics
- Compare to baseline
5. Deployment:
- Package model
- Deploy to serving
Pipeline Tools:
- Kubeflow Pipelines
- Apache Airflow
- Prefect
- MLflow Pipelines
- ZenML
""")
ml_pipelines()
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class PipelineStep:
"""Pipeline step definition"""
name: str
function: Callable
inputs: List[str] = None
outputs: List[str] = None
dependencies: List[str] = None
status: StepStatus = StepStatus.PENDING
result: Any = None
error: str = None
def __post_init__(self):
self.inputs = self.inputs or []
self.outputs = self.outputs or []
self.dependencies = self.dependencies or []
class SimplePipeline:
"""Simple ML pipeline orchestrator"""
def __init__(self, name: str):
self.name = name
self.steps: Dict[str, PipelineStep] = {}
self.context: Dict[str, Any] = {}
def add_step(self, step: PipelineStep):
"""Add step to pipeline"""
self.steps[step.name] = step
def run(self, initial_context: Dict = None) -> Dict:
"""Execute pipeline"""
self.context = initial_context or {}
execution_order = self._get_execution_order()
print(f"Running pipeline: {self.name}")
print(f"Steps: {[s for s in execution_order]}")
for step_name in execution_order:
step = self.steps[step_name]
# Check dependencies
if not self._deps_satisfied(step):
step.status = StepStatus.SKIPPED
continue
# Execute step
step.status = StepStatus.RUNNING
try:
# Get inputs from context
inputs = {k: self.context[k] for k in step.inputs
if k in self.context}
# Run step
start = time.time()
result = step.function(**inputs)
duration = time.time() - start
# Store outputs
if isinstance(result, dict):
for output in step.outputs:
if output in result:
self.context[output] = result[output]
else:
if step.outputs:
self.context[step.outputs[0]] = result
step.result = result
step.status = StepStatus.COMPLETED
print(f" ✓ {step_name} ({duration:.2f}s)")
except Exception as e:
step.error = str(e)
step.status = StepStatus.FAILED
print(f" ✗ {step_name}: {e}")
return self._get_results()
def _get_execution_order(self) -> List[str]:
"""Topological sort of steps"""
order = []
visited = set()
def visit(name):
if name in visited:
return
visited.add(name)
step = self.steps[name]
for dep in step.dependencies:
visit(dep)
order.append(name)
for name in self.steps:
visit(name)
return order
def _deps_satisfied(self, step: PipelineStep) -> bool:
"""Check if dependencies completed"""
for dep in step.dependencies:
if dep in self.steps:
if self.steps[dep].status != StepStatus.COMPLETED:
return False
return True
def _get_results(self) -> Dict:
"""Get pipeline results"""
return {
'pipeline': self.name,
'steps': {
name: {
'status': step.status.value,
'error': step.error
}
for name, step in self.steps.items()
},
'outputs': self.context
}
# Example pipeline
def example_pipeline():
"""Example ML pipeline"""
# Define step functions
def load_data():
print(" Loading data...")
return {'data': [[1, 2], [3, 4], [5, 6]]}
def preprocess(data):
print(" Preprocessing...")
return {'processed_data': [x for x in data]}
def train_model(processed_data):
print(" Training model...")
return {'model': 'trained_model', 'accuracy': 0.95}
def evaluate(model, processed_data):
print(" Evaluating...")
return {'metrics': {'accuracy': 0.93, 'f1': 0.91}}
# Build pipeline
pipeline = SimplePipeline("training_pipeline")
pipeline.add_step(PipelineStep(
name="load_data",
function=load_data,
outputs=["data"]
))
pipeline.add_step(PipelineStep(
name="preprocess",
function=preprocess,
inputs=["data"],
outputs=["processed_data"],
dependencies=["load_data"]
))
pipeline.add_step(PipelineStep(
name="train",
function=train_model,
inputs=["processed_data"],
outputs=["model", "accuracy"],
dependencies=["preprocess"]
))
pipeline.add_step(PipelineStep(
name="evaluate",
function=evaluate,
inputs=["model", "processed_data"],
outputs=["metrics"],
dependencies=["train"]
))
# Run pipeline
results = pipeline.run()
print("\nPipeline results:", results)
example_pipeline()Summary
| Component | Purpose | Tools | |-----------|---------|-------| | Experiment Tracking | Reproducibility | MLflow, W&B | | Model Registry | Model management | MLflow, Vertex AI | | Pipelines | Automation | Kubeflow, Airflow | | Feature Store | Feature management | Feast, Tecton |
Key takeaways:
- MLOps bridges ML development and production
- Experiment tracking ensures reproducibility
- Model registry manages model lifecycle
- Pipelines automate ML workflows
- Version everything: data, code, models
- Start simple, add complexity as needed