Intermediate 90 min read

Chapter 27: AI Ethics and Responsible Development

Bias, fairness, interpretability, privacy, and governance.

Libraries covered: Fairlearn SHAP

Learning Objectives

["Detect bias in models", "Apply fairness metrics", "Explain model decisions"]


27.1 Introduction to AI Ethics Intermediate

Introduction to AI Ethics

As AI systems become more prevalent, understanding the ethical implications becomes essential. This section introduces key ethical concepts, principles, and frameworks for responsible AI development.

AI Ethics Fundamentals

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum

def ethics_overview():
    """AI ethics fundamentals"""
    print("AI ETHICS")
    print("=" * 60)

    print("""
Why AI Ethics Matters:

AI Systems Can:
  - Affect millions of people
  - Make consequential decisions
  - Perpetuate or amplify biases
  - Be opaque and unexplainable
  - Create new societal challenges

Historical Examples:

1. Hiring Algorithms:
   - Bias against women in tech
   - Historical data reflected past discrimination

2. Criminal Justice:
   - Risk assessment tools
   - Racial bias in predictions

3. Healthcare:
   - Algorithm prioritized white patients
   - Used cost as proxy for health needs

4. Content Moderation:
   - Censorship concerns
   - Disproportionate impact

Key Questions:
  - Who benefits and who is harmed?
  - Who makes decisions and how?
  - What are the unintended consequences?
  - How do we ensure accountability?
""")

ethics_overview()


class EthicalPrinciple(Enum):
    FAIRNESS = "fairness"
    TRANSPARENCY = "transparency"
    PRIVACY = "privacy"
    ACCOUNTABILITY = "accountability"
    SAFETY = "safety"
    BENEFICENCE = "beneficence"
    NON_MALEFICENCE = "non_maleficence"


def core_principles():
    """Core ethical principles for AI"""
    print("\nCORE PRINCIPLES")
    print("-" * 50)

    print("""
1. Fairness:
   - Equal treatment across groups
   - Non-discrimination
   - Equitable outcomes

2. Transparency:
   - Explainable decisions
   - Clear documentation
   - Honest communication

3. Privacy:
   - Data protection
   - Consent and control
   - Minimal data collection

4. Accountability:
   - Clear responsibility
   - Audit trails
   - Redress mechanisms

5. Safety:
   - Reliable operation
   - Risk mitigation
   - Human oversight

6. Beneficence:
   - Positive impact
   - Social good
   - Human flourishing

7. Non-Maleficence:
   - Avoid harm
   - Risk assessment
   - Precautionary approach
""")

core_principles()


@dataclass
class EthicsGuideline:
    """Ethics guideline definition"""
    principle: EthicalPrinciple
    description: str
    requirements: List[str]
    examples: List[str]
    evaluation_criteria: List[str]

Ethical Frameworks

PYTHON
def ethical_frameworks():
    """Ethical frameworks for AI"""
    print("\nETHICAL FRAMEWORKS")
    print("=" * 60)

    print("""
Major Ethical Frameworks:

1. Consequentialism:
   - Focus on outcomes
   - Greatest good for greatest number
   - Cost-benefit analysis
   Applied: Evaluate total impact of AI system

2. Deontology:
   - Focus on duties and rights
   - Universal moral rules
   - Regardless of consequences
   Applied: Certain actions are never permissible

3. Virtue Ethics:
   - Focus on character
   - What would a virtuous person do?
   - Building good habits
   Applied: Organizational culture of ethics

4. Care Ethics:
   - Focus on relationships
   - Context-dependent
   - Responsibility to vulnerable
   Applied: Protecting marginalized groups

5. Justice Theory:
   - Focus on fairness
   - Equal distribution
   - Veil of ignorance
   Applied: Who benefits and who is harmed?

Applying Frameworks:
  - No single framework is complete
  - Use multiple perspectives
  - Context matters
  - Stakeholder input essential
""")

ethical_frameworks()


class EthicsFramework:
    """Base ethics evaluation framework"""
    def __init__(self, name: str):
        self.name = name
        self.evaluations = []

    def evaluate(self, action: str, context: Dict) -> Dict:
        """Evaluate action against framework"""
        raise NotImplementedError


class ConsequentialistEvaluation(EthicsFramework):
    """Consequentialist ethics evaluation"""
    def __init__(self):
        super().__init__("Consequentialism")

    def evaluate(self, action: str, context: Dict) -> Dict:
        """Evaluate based on consequences"""
        stakeholders = context.get('stakeholders', [])
        impacts = context.get('impacts', {})

        total_benefit = 0
        total_harm = 0

        for stakeholder in stakeholders:
            impact = impacts.get(stakeholder, {})
            total_benefit += impact.get('benefit', 0)
            total_harm += impact.get('harm', 0)

        net_impact = total_benefit - total_harm

        return {
            'framework': self.name,
            'action': action,
            'total_benefit': total_benefit,
            'total_harm': total_harm,
            'net_impact': net_impact,
            'recommendation': 'proceed' if net_impact > 0 else 'reconsider'
        }


class DeontologicalEvaluation(EthicsFramework):
    """Deontological ethics evaluation"""
    def __init__(self):
        super().__init__("Deontology")
        self.prohibited_actions = [
            'deception',
            'privacy_violation',
            'discrimination',
            'coercion'
        ]

    def evaluate(self, action: str, context: Dict) -> Dict:
        """Evaluate based on duties and rules"""
        action_types = context.get('action_types', [])

        violations = [
            a for a in action_types
            if a in self.prohibited_actions
        ]

        return {
            'framework': self.name,
            'action': action,
            'violations': violations,
            'recommendation': 'prohibit' if violations else 'permissible'
        }


class JusticeEvaluation(EthicsFramework):
    """Justice-based ethics evaluation"""
    def __init__(self):
        super().__init__("Justice")

    def evaluate(self, action: str, context: Dict) -> Dict:
        """Evaluate based on fairness and distribution"""
        impacts = context.get('group_impacts', {})

        # Check for disparate impact
        impact_values = list(impacts.values())
        if not impact_values:
            return {'framework': self.name, 'error': 'No impact data'}

        max_impact = max(impact_values)
        min_impact = min(impact_values)
        disparity = max_impact - min_impact

        disadvantaged = [
            group for group, impact in impacts.items()
            if impact < 0
        ]

        return {
            'framework': self.name,
            'action': action,
            'disparity': disparity,
            'disadvantaged_groups': disadvantaged,
            'recommendation': 'review' if disparity > 0.5 or disadvantaged else 'fair'
        }

Stakeholder Analysis

PYTHON
from typing import Dict, List
from dataclasses import dataclass

def stakeholder_analysis():
    """Stakeholder analysis for AI systems"""
    print("\nSTAKEHOLDER ANALYSIS")
    print("=" * 60)

    print("""
Identifying Stakeholders:

Direct Stakeholders:
  - Users of the system
  - Subjects of predictions
  - System operators
  - Developers

Indirect Stakeholders:
  - Affected communities
  - Competitors
  - Regulators
  - Society at large

Vulnerable Groups:
  - Marginalized communities
  - Children
  - Elderly
  - Low-income populations

Analysis Steps:

1. Identify all stakeholders
2. Understand their interests
3. Assess potential impacts
4. Evaluate power dynamics
5. Determine engagement strategy
""")

stakeholder_analysis()


@dataclass
class Stakeholder:
    """Stakeholder definition"""
    name: str
    category: str  # direct, indirect, vulnerable
    interests: List[str]
    potential_benefits: List[str]
    potential_harms: List[str]
    power_level: str  # high, medium, low
    engagement_priority: str


class StakeholderAnalysis:
    """Analyze stakeholders for AI system"""
    def __init__(self, system_name: str):
        self.system_name = system_name
        self.stakeholders: List[Stakeholder] = []

    def add_stakeholder(self, stakeholder: Stakeholder):
        """Add stakeholder"""
        self.stakeholders.append(stakeholder)

    def analyze_impacts(self) -> Dict:
        """Analyze impacts across stakeholders"""
        impacts = {
            'benefits': {},
            'harms': {},
            'net_impact': {}
        }

        for s in self.stakeholders:
            benefits = len(s.potential_benefits)
            harms = len(s.potential_harms)

            impacts['benefits'][s.name] = benefits
            impacts['harms'][s.name] = harms
            impacts['net_impact'][s.name] = benefits - harms

        return impacts

    def identify_vulnerable(self) -> List[Stakeholder]:
        """Identify vulnerable stakeholders"""
        return [
            s for s in self.stakeholders
            if s.category == 'vulnerable' or len(s.potential_harms) > len(s.potential_benefits)
        ]

    def prioritize_engagement(self) -> List[Stakeholder]:
        """Prioritize stakeholder engagement"""
        def priority_score(s):
            scores = {'high': 3, 'medium': 2, 'low': 1}
            harm_weight = len(s.potential_harms)
            power_weight = scores.get(s.power_level, 1)
            vulnerable = 2 if s.category == 'vulnerable' else 1
            return harm_weight * power_weight * vulnerable

        return sorted(self.stakeholders, key=priority_score, reverse=True)

    def generate_report(self) -> Dict:
        """Generate stakeholder report"""
        return {
            'system': self.system_name,
            'total_stakeholders': len(self.stakeholders),
            'by_category': self._count_by_category(),
            'impacts': self.analyze_impacts(),
            'vulnerable': [s.name for s in self.identify_vulnerable()],
            'engagement_priority': [s.name for s in self.prioritize_engagement()[:5]]
        }

    def _count_by_category(self) -> Dict:
        from collections import Counter
        return dict(Counter(s.category for s in self.stakeholders))


# Example analysis
def stakeholder_example():
    """Example stakeholder analysis"""
    analysis = StakeholderAnalysis("Credit Scoring Model")

    analysis.add_stakeholder(Stakeholder(
        name="Loan Applicants",
        category="direct",
        interests=["Fair assessment", "Quick decisions", "Privacy"],
        potential_benefits=["Faster approval", "Consistent criteria"],
        potential_harms=["Denial based on bias", "Privacy concerns"],
        power_level="low",
        engagement_priority="high"
    ))

    analysis.add_stakeholder(Stakeholder(
        name="Low-Income Applicants",
        category="vulnerable",
        interests=["Access to credit", "Fair treatment"],
        potential_benefits=["Alternative data consideration"],
        potential_harms=["Historical bias", "Higher denial rates"],
        power_level="low",
        engagement_priority="high"
    ))

    analysis.add_stakeholder(Stakeholder(
        name="Financial Institution",
        category="direct",
        interests=["Profit", "Risk management", "Compliance"],
        potential_benefits=["Efficiency", "Reduced defaults"],
        potential_harms=["Regulatory penalties", "Reputation damage"],
        power_level="high",
        engagement_priority="medium"
    ))

    report = analysis.generate_report()
    print("Stakeholder Report:", report)

stakeholder_example()

Ethics Review Process

PYTHON
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

def ethics_review():
    """Ethics review process"""
    print("\nETHICS REVIEW PROCESS")
    print("=" * 60)

    print("""
When to Review:

1. Before Development:
   - Project scoping
   - Use case evaluation
   - Risk assessment

2. During Development:
   - Design decisions
   - Data selection
   - Algorithm choices

3. Before Deployment:
   - Full ethics review
   - Stakeholder feedback
   - Mitigation plans

4. After Deployment:
   - Ongoing monitoring
   - Incident review
   - Regular audits

Review Components:

1. Impact Assessment:
   - Who is affected?
   - What are the risks?
   - How severe?

2. Mitigation Plan:
   - How to reduce risks?
   - What safeguards?
   - Monitoring strategy

3. Documentation:
   - Decision rationale
   - Trade-offs considered
   - Dissenting views
""")

ethics_review()


class ReviewStatus(Enum):
    PENDING = "pending"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    CONDITIONAL = "conditional"
    REJECTED = "rejected"


@dataclass
class EthicsReview:
    """Ethics review record"""
    project_name: str
    review_id: str
    submitted_by: str
    submitted_at: datetime
    status: ReviewStatus
    findings: List[Dict]
    recommendations: List[str]
    conditions: List[str]
    approved_by: str = None
    approved_at: datetime = None


class EthicsReviewBoard:
    """Ethics review board"""
    def __init__(self):
        self.reviews: Dict[str, EthicsReview] = {}
        self.members: List[str] = []
        self.criteria = []

    def submit_review(self, project_name: str,
                     submission: Dict) -> EthicsReview:
        """Submit project for ethics review"""
        review_id = f"ERB-{len(self.reviews) + 1:04d}"

        review = EthicsReview(
            project_name=project_name,
            review_id=review_id,
            submitted_by=submission.get('submitter'),
            submitted_at=datetime.now(),
            status=ReviewStatus.PENDING,
            findings=[],
            recommendations=[],
            conditions=[]
        )

        self.reviews[review_id] = review
        return review

    def conduct_review(self, review_id: str,
                       assessment: Dict) -> EthicsReview:
        """Conduct ethics review"""
        review = self.reviews.get(review_id)
        if not review:
            raise ValueError(f"Review {review_id} not found")

        review.status = ReviewStatus.IN_REVIEW
        review.findings = assessment.get('findings', [])
        review.recommendations = assessment.get('recommendations', [])

        return review

    def make_decision(self, review_id: str, decision: str,
                      conditions: List[str] = None,
                      approver: str = None) -> EthicsReview:
        """Make review decision"""
        review = self.reviews.get(review_id)
        if not review:
            raise ValueError(f"Review {review_id} not found")

        status_map = {
            'approve': ReviewStatus.APPROVED,
            'conditional': ReviewStatus.CONDITIONAL,
            'reject': ReviewStatus.REJECTED
        }

        review.status = status_map.get(decision, ReviewStatus.PENDING)
        review.conditions = conditions or []
        review.approved_by = approver
        review.approved_at = datetime.now()

        return review

    def get_review_summary(self, review_id: str) -> Dict:
        """Get review summary"""
        review = self.reviews.get(review_id)
        if not review:
            return {}

        return {
            'review_id': review.review_id,
            'project': review.project_name,
            'status': review.status.value,
            'findings_count': len(review.findings),
            'has_conditions': len(review.conditions) > 0
        }

Summary

| Principle | Key Question | Implementation | |-----------|-------------|----------------| | Fairness | Who benefits/harms? | Bias testing | | Transparency | Can we explain? | Documentation | | Privacy | Is data protected? | Data governance | | Accountability | Who is responsible? | Audit trails |

Key takeaways:

  • Ethics is essential in AI development
  • Use multiple ethical frameworks
  • Identify and protect vulnerable stakeholders
  • Implement formal ethics review processes
  • Document decisions and trade-offs
  • Ethics is ongoing, not one-time

27.2 Fairness and Bias in ML Intermediate

Fairness and Bias in ML

Machine learning systems can perpetuate or amplify societal biases. This section covers types of bias, fairness definitions, and techniques for building fairer ML systems.

Understanding Bias in ML

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import numpy as np

def bias_overview():
    """Understanding bias in ML"""
    print("BIAS IN MACHINE LEARNING")
    print("=" * 60)

    print("""
Sources of Bias:

1. Historical Bias:
   - Past discrimination encoded in data
   - Example: Historical hiring patterns

2. Representation Bias:
   - Some groups underrepresented
   - Example: Face recognition datasets

3. Measurement Bias:
   - Proxies don't measure equally
   - Example: Using arrests as crime proxy

4. Aggregation Bias:
   - One model for diverse groups
   - Example: Medical diagnosis across demographics

5. Evaluation Bias:
   - Biased benchmarks
   - Example: Test set not representative

6. Deployment Bias:
   - System used differently than intended
   - Example: Algorithm for different population

Types of Harm:

1. Allocative Harm:
   - Unequal distribution of resources
   - Jobs, loans, housing

2. Representational Harm:
   - Stereotyping
   - Denigration
   - Under-representation
""")

bias_overview()


class BiasType(Enum):
    HISTORICAL = "historical"
    REPRESENTATION = "representation"
    MEASUREMENT = "measurement"
    AGGREGATION = "aggregation"
    EVALUATION = "evaluation"
    DEPLOYMENT = "deployment"


@dataclass
class BiasAssessment:
    """Bias assessment result"""
    bias_type: BiasType
    description: str
    severity: str  # high, medium, low
    affected_groups: List[str]
    mitigation_options: List[str]


def bias_examples():
    """Real-world bias examples"""
    print("\nBIAS EXAMPLES")
    print("-" * 50)

    print("""
Case Studies:

1. Amazon Hiring Tool (2018):
   - Trained on historical hiring data
   - Penalized resumes with "women's"
   - Downgraded women's colleges
   → Historical bias

2. COMPAS Recidivism (2016):
   - Higher false positive for Black defendants
   - Lower false positive for white defendants
   - Used for bail/sentencing decisions
   → Measurement and evaluation bias

3. Healthcare Algorithm (2019):
   - Used cost as proxy for health needs
   - Black patients have less access to care
   - Lower costs, lower priority scores
   → Measurement bias

4. Face Recognition (2018-):
   - Higher error rates for dark-skinned women
   - Training data predominantly light-skinned
   - Deployed in high-stakes contexts
   → Representation bias
""")

bias_examples()

Fairness Definitions

PYTHON
from typing import Dict, List
import numpy as np

def fairness_definitions():
    """Fairness definitions in ML"""
    print("\nFAIRNESS DEFINITIONS")
    print("=" * 60)

    print("""
Key Fairness Metrics:

1. Demographic Parity:
   P(Y_hat=1|A=0) = P(Y_hat=1|A=1)
   - Equal positive prediction rates
   - Regardless of actual outcomes

2. Equalized Odds:
   P(Y_hat=1|Y=1,A=0) = P(Y_hat=1|Y=1,A=1)  # TPR
   P(Y_hat=1|Y=0,A=0) = P(Y_hat=1|Y=0,A=1)  # FPR
   - Equal TPR and FPR across groups
   - Calibrated within each group

3. Equal Opportunity:
   P(Y_hat=1|Y=1,A=0) = P(Y_hat=1|Y=1,A=1)
   - Equal true positive rates
   - Focus on qualified individuals

4. Predictive Parity:
   P(Y=1|Y_hat=1,A=0) = P(Y=1|Y_hat=1,A=1)
   - Equal precision across groups
   - Positive predictions equally accurate

5. Individual Fairness:
   Similar individuals treated similarly
   - Based on relevant features
   - Harder to define similarity

Important: These definitions can conflict!
""")

fairness_definitions()


class FairnessMetrics:
    """Calculate fairness metrics"""
    def __init__(self, y_true: np.ndarray, y_pred: np.ndarray,
                 sensitive_attr: np.ndarray):
        self.y_true = y_true
        self.y_pred = y_pred
        self.sensitive = sensitive_attr

    def demographic_parity(self) -> Dict:
        """Calculate demographic parity"""
        groups = np.unique(self.sensitive)
        rates = {}

        for group in groups:
            mask = self.sensitive == group
            rate = np.mean(self.y_pred[mask])
            rates[f'group_{group}'] = rate

        disparity = max(rates.values()) - min(rates.values())
        return {
            'rates': rates,
            'disparity': disparity,
            'satisfied': disparity < 0.1
        }

    def equalized_odds(self) -> Dict:
        """Calculate equalized odds"""
        groups = np.unique(self.sensitive)
        tpr = {}
        fpr = {}

        for group in groups:
            mask = self.sensitive == group
            y_true_g = self.y_true[mask]
            y_pred_g = self.y_pred[mask]

            # True positive rate
            pos_mask = y_true_g == 1
            tpr[f'group_{group}'] = np.mean(y_pred_g[pos_mask]) if pos_mask.any() else 0

            # False positive rate
            neg_mask = y_true_g == 0
            fpr[f'group_{group}'] = np.mean(y_pred_g[neg_mask]) if neg_mask.any() else 0

        tpr_diff = max(tpr.values()) - min(tpr.values())
        fpr_diff = max(fpr.values()) - min(fpr.values())

        return {
            'tpr': tpr,
            'fpr': fpr,
            'tpr_disparity': tpr_diff,
            'fpr_disparity': fpr_diff,
            'satisfied': tpr_diff < 0.1 and fpr_diff < 0.1
        }

    def equal_opportunity(self) -> Dict:
        """Calculate equal opportunity"""
        groups = np.unique(self.sensitive)
        tpr = {}

        for group in groups:
            mask = self.sensitive == group
            pos_mask = (self.sensitive == group) & (self.y_true == 1)
            if pos_mask.any():
                tpr[f'group_{group}'] = np.mean(self.y_pred[pos_mask])
            else:
                tpr[f'group_{group}'] = 0

        disparity = max(tpr.values()) - min(tpr.values())
        return {
            'tpr': tpr,
            'disparity': disparity,
            'satisfied': disparity < 0.1
        }

    def predictive_parity(self) -> Dict:
        """Calculate predictive parity"""
        groups = np.unique(self.sensitive)
        precision = {}

        for group in groups:
            mask = (self.sensitive == group) & (self.y_pred == 1)
            if mask.any():
                precision[f'group_{group}'] = np.mean(self.y_true[mask])
            else:
                precision[f'group_{group}'] = 0

        disparity = max(precision.values()) - min(precision.values())
        return {
            'precision': precision,
            'disparity': disparity,
            'satisfied': disparity < 0.1
        }

    def full_report(self) -> Dict:
        """Generate full fairness report"""
        return {
            'demographic_parity': self.demographic_parity(),
            'equalized_odds': self.equalized_odds(),
            'equal_opportunity': self.equal_opportunity(),
            'predictive_parity': self.predictive_parity()
        }


# Example
def fairness_example():
    """Example fairness analysis"""
    np.random.seed(42)
    n = 1000

    # Simulated data
    y_true = np.random.binomial(1, 0.5, n)
    sensitive = np.random.binomial(1, 0.5, n)

    # Biased predictions (higher for group 1)
    y_pred = np.where(
        sensitive == 1,
        np.random.binomial(1, 0.7, n),
        np.random.binomial(1, 0.5, n)
    )

    metrics = FairnessMetrics(y_true, y_pred, sensitive)
    report = metrics.full_report()
    print("Fairness Report:")
    for metric, values in report.items():
        print(f"  {metric}: satisfied={values['satisfied']}")

fairness_example()

Bias Mitigation Techniques

PYTHON
from typing import Dict, List, Callable
import numpy as np

def mitigation_techniques():
    """Bias mitigation techniques"""
    print("\nBIAS MITIGATION")
    print("=" * 60)

    print("""
Mitigation Stages:

1. Pre-processing:
   - Modify training data
   - Remove bias before training
   - Techniques: reweighting, resampling

2. In-processing:
   - Modify training algorithm
   - Add fairness constraints
   - Techniques: adversarial debiasing

3. Post-processing:
   - Modify predictions
   - Adjust thresholds
   - Techniques: calibration

Pre-processing Methods:
  - Reweighting samples
  - Oversampling minorities
  - Removing sensitive features
  - Learning fair representations

In-processing Methods:
  - Fairness constraints
  - Adversarial learning
  - Regularization

Post-processing Methods:
  - Threshold adjustment
  - Calibration
  - Reject option classification
""")

mitigation_techniques()


class PreprocessingMitigation:
    """Pre-processing bias mitigation"""

    @staticmethod
    def reweight(X: np.ndarray, y: np.ndarray,
                 sensitive: np.ndarray) -> np.ndarray:
        """Calculate sample weights for fairness"""
        groups = np.unique(sensitive)
        labels = np.unique(y)
        weights = np.ones(len(y))

        # Calculate expected and observed proportions
        n = len(y)
        for g in groups:
            for l in labels:
                mask = (sensitive == g) & (y == l)
                observed = mask.sum() / n
                expected = (sensitive == g).sum() / n * (y == l).sum() / n
                if observed > 0:
                    weights[mask] = expected / observed

        return weights

    @staticmethod
    def resample(X: np.ndarray, y: np.ndarray,
                 sensitive: np.ndarray) -> tuple:
        """Resample to balance groups"""
        from collections import Counter

        # Find minority group-label combination
        combinations = list(zip(sensitive, y))
        counts = Counter(combinations)
        target_count = max(counts.values())

        # Oversample
        new_indices = []
        for combo, count in counts.items():
            indices = [i for i, c in enumerate(combinations) if c == combo]
            oversampled = np.random.choice(
                indices, size=target_count, replace=True
            )
            new_indices.extend(oversampled)

        np.random.shuffle(new_indices)
        return X[new_indices], y[new_indices], sensitive[new_indices]


class InprocessingMitigation:
    """In-processing bias mitigation"""

    @staticmethod
    def add_fairness_constraint(loss_fn: Callable,
                                predictions: np.ndarray,
                                sensitive: np.ndarray,
                                fairness_weight: float = 1.0) -> float:
        """Add fairness penalty to loss"""
        base_loss = loss_fn(predictions)

        # Demographic parity penalty
        group_0_rate = predictions[sensitive == 0].mean()
        group_1_rate = predictions[sensitive == 1].mean()
        fairness_penalty = (group_0_rate - group_1_rate) ** 2

        return base_loss + fairness_weight * fairness_penalty


class PostprocessingMitigation:
    """Post-processing bias mitigation"""

    @staticmethod
    def equalize_odds_threshold(y_pred_proba: np.ndarray,
                                y_true: np.ndarray,
                                sensitive: np.ndarray) -> Dict:
        """Find thresholds to equalize odds"""
        best_thresholds = {}
        groups = np.unique(sensitive)

        # Find optimal threshold per group
        for group in groups:
            mask = sensitive == group
            group_proba = y_pred_proba[mask]
            group_true = y_true[mask]

            best_threshold = 0.5
            best_f1 = 0

            for threshold in np.arange(0.1, 0.9, 0.05):
                pred = (group_proba >= threshold).astype(int)
                tp = ((pred == 1) & (group_true == 1)).sum()
                fp = ((pred == 1) & (group_true == 0)).sum()
                fn = ((pred == 0) & (group_true == 1)).sum()

                precision = tp / (tp + fp) if (tp + fp) > 0 else 0
                recall = tp / (tp + fn) if (tp + fn) > 0 else 0
                f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

                if f1 > best_f1:
                    best_f1 = f1
                    best_threshold = threshold

            best_thresholds[group] = best_threshold

        return best_thresholds

    @staticmethod
    def calibrated_equalized_odds(y_pred_proba: np.ndarray,
                                   y_true: np.ndarray,
                                   sensitive: np.ndarray) -> np.ndarray:
        """Apply calibrated equalized odds"""
        groups = np.unique(sensitive)
        thresholds = PostprocessingMitigation.equalize_odds_threshold(
            y_pred_proba, y_true, sensitive
        )

        # Apply group-specific thresholds
        y_pred_fair = np.zeros_like(y_pred_proba)
        for group in groups:
            mask = sensitive == group
            y_pred_fair[mask] = (
                y_pred_proba[mask] >= thresholds[group]
            ).astype(int)

        return y_pred_fair

Fairness Auditing

PYTHON
from typing import Dict, List
from dataclasses import dataclass

def fairness_auditing():
    """Fairness auditing process"""
    print("\nFAIRNESS AUDITING")
    print("=" * 60)

    print("""
Audit Process:

1. Define Protected Attributes:
   - Legal requirements
   - Ethical considerations
   - Intersectionality

2. Collect Disaggregated Data:
   - Break down by groups
   - Multiple attributes
   - Intersections

3. Calculate Metrics:
   - Multiple fairness metrics
   - Error rates by group
   - Disparate impact

4. Statistical Testing:
   - Significance of disparities
   - Confidence intervals
   - Sample size considerations

5. Document Findings:
   - Quantitative results
   - Qualitative context
   - Recommendations

6. Remediation:
   - Mitigation strategies
   - Trade-off analysis
   - Ongoing monitoring
""")

fairness_auditing()


@dataclass
class AuditResult:
    """Fairness audit result"""
    metric_name: str
    groups: Dict[str, float]
    disparity: float
    threshold: float
    passed: bool
    statistical_significance: float


class FairnessAuditor:
    """Comprehensive fairness auditor"""
    def __init__(self, significance_level: float = 0.05):
        self.significance_level = significance_level
        self.results: List[AuditResult] = []

    def audit(self, y_true: np.ndarray, y_pred: np.ndarray,
              sensitive: np.ndarray,
              protected_attribute: str) -> Dict:
        """Run full fairness audit"""
        metrics = FairnessMetrics(y_true, y_pred, sensitive)

        # Calculate all metrics
        dp = metrics.demographic_parity()
        eo = metrics.equalized_odds()
        eop = metrics.equal_opportunity()
        pp = metrics.predictive_parity()

        # Record results
        self.results = [
            AuditResult(
                metric_name="demographic_parity",
                groups=dp['rates'],
                disparity=dp['disparity'],
                threshold=0.1,
                passed=dp['satisfied'],
                statistical_significance=self._compute_significance(
                    y_pred, sensitive
                )
            ),
            AuditResult(
                metric_name="equal_opportunity",
                groups=eop['tpr'],
                disparity=eop['disparity'],
                threshold=0.1,
                passed=eop['satisfied'],
                statistical_significance=0.0
            )
        ]

        return self.generate_report(protected_attribute)

    def _compute_significance(self, y_pred: np.ndarray,
                              sensitive: np.ndarray) -> float:
        """Compute statistical significance of disparity"""
        from scipy import stats

        group_0 = y_pred[sensitive == 0]
        group_1 = y_pred[sensitive == 1]

        _, p_value = stats.ttest_ind(group_0, group_1)
        return p_value

    def generate_report(self, protected_attribute: str) -> Dict:
        """Generate audit report"""
        return {
            'protected_attribute': protected_attribute,
            'metrics_evaluated': len(self.results),
            'metrics_passed': sum(1 for r in self.results if r.passed),
            'results': [
                {
                    'metric': r.metric_name,
                    'disparity': r.disparity,
                    'passed': r.passed,
                    'significance': r.statistical_significance
                }
                for r in self.results
            ],
            'recommendation': self._get_recommendation()
        }

    def _get_recommendation(self) -> str:
        """Generate recommendation based on results"""
        failures = [r for r in self.results if not r.passed]

        if not failures:
            return "All fairness metrics passed. Continue monitoring."
        elif len(failures) == 1:
            return f"Address disparity in {failures[0].metric_name}."
        else:
            metrics = [f.metric_name for f in failures]
            return f"Multiple fairness concerns: {', '.join(metrics)}."

Summary

| Metric | Measures | Use When | |--------|----------|----------| | Demographic Parity | Equal selection rates | Diverse outcomes needed | | Equalized Odds | Equal TPR and FPR | Errors equally costly | | Equal Opportunity | Equal TPR | False negatives costly | | Predictive Parity | Equal precision | Positive predictions matter |

Key takeaways:

  • Bias can enter at any stage
  • Multiple fairness definitions exist
  • Fairness metrics can conflict
  • Mitigation possible at all stages
  • Regular auditing is essential
  • Context determines appropriate fairness definition

27.3 Transparency and Explainability Intermediate

Transparency and Explainability

Understanding why AI systems make decisions is crucial for trust, debugging, and accountability. This section covers explainability techniques and transparency practices.

Explainability Overview

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import numpy as np

def explainability_overview():
    """Explainability in ML"""
    print("EXPLAINABILITY IN ML")
    print("=" * 60)

    print("""
Why Explainability Matters:

1. Trust:
   - Users need to trust predictions
   - Understand system behavior
   - Identify limitations

2. Debugging:
   - Find model errors
   - Understand failures
   - Improve performance

3. Compliance:
   - GDPR right to explanation
   - Fair lending regulations
   - Healthcare requirements

4. Scientific Discovery:
   - Learn from models
   - Generate hypotheses
   - Validate findings

Types of Explainability:

1. Global Explanations:
   - How model works overall
   - Feature importance
   - Model behavior patterns

2. Local Explanations:
   - Why this prediction?
   - Instance-specific
   - Counterfactuals

Interpretability vs Explainability:
  Interpretable: Model itself is understandable
  Explainable: Post-hoc explanations of black box
""")

explainability_overview()


class ExplanationType(Enum):
    GLOBAL = "global"
    LOCAL = "local"
    EXAMPLE = "example"
    COUNTERFACTUAL = "counterfactual"


@dataclass
class Explanation:
    """Model explanation"""
    explanation_type: ExplanationType
    method: str
    features: Dict[str, float]
    confidence: float
    metadata: Dict[str, Any]

Feature Importance Methods

PYTHON
from typing import Dict, List
import numpy as np

def feature_importance():
    """Feature importance methods"""
    print("\nFEATURE IMPORTANCE")
    print("=" * 60)

    print("""
Global Feature Importance Methods:

1. Built-in Importance:
   - Tree-based: Gini/entropy reduction
   - Linear: Coefficient magnitudes
   - Fast but model-specific

2. Permutation Importance:
   - Shuffle feature, measure drop
   - Model-agnostic
   - Captures feature interactions

3. SHAP Values:
   - Based on game theory
   - Additive explanations
   - Consistent and fair

4. Partial Dependence:
   - Average effect of feature
   - Shows relationship direction
   - Can miss interactions

Interpretation Guidelines:
  - High importance ≠ causal
  - Correlated features share importance
  - Consider feature scales
  - Use multiple methods
""")

feature_importance()


class FeatureImportance:
    """Calculate feature importance"""
    def __init__(self, model, X: np.ndarray, y: np.ndarray,
                 feature_names: List[str]):
        self.model = model
        self.X = X
        self.y = y
        self.feature_names = feature_names

    def permutation_importance(self, n_repeats: int = 10) -> Dict:
        """Calculate permutation importance"""
        baseline_score = self._score(self.X, self.y)
        importances = {}

        for i, name in enumerate(self.feature_names):
            scores = []
            for _ in range(n_repeats):
                X_permuted = self.X.copy()
                np.random.shuffle(X_permuted[:, i])
                score = self._score(X_permuted, self.y)
                scores.append(baseline_score - score)

            importances[name] = {
                'mean': np.mean(scores),
                'std': np.std(scores)
            }

        return dict(sorted(
            importances.items(),
            key=lambda x: -x[1]['mean']
        ))

    def _score(self, X: np.ndarray, y: np.ndarray) -> float:
        """Calculate model score"""
        predictions = self.model.predict(X)
        return np.mean(predictions == y)


class PartialDependence:
    """Partial dependence plots"""
    def __init__(self, model, X: np.ndarray, feature_names: List[str]):
        self.model = model
        self.X = X
        self.feature_names = feature_names

    def compute(self, feature_idx: int, num_points: int = 50) -> Dict:
        """Compute partial dependence for feature"""
        feature_values = np.linspace(
            self.X[:, feature_idx].min(),
            self.X[:, feature_idx].max(),
            num_points
        )

        pd_values = []
        for value in feature_values:
            X_modified = self.X.copy()
            X_modified[:, feature_idx] = value

            # Average prediction
            predictions = self.model.predict_proba(X_modified)[:, 1]
            pd_values.append(np.mean(predictions))

        return {
            'feature': self.feature_names[feature_idx],
            'values': feature_values.tolist(),
            'pd_values': pd_values
        }

SHAP and LIME

PYTHON
def shap_lime():
    """SHAP and LIME explanations"""
    print("\nSHAP AND LIME")
    print("=" * 60)

    print("""
SHAP (SHapley Additive exPlanations):

Theory:
  - Based on Shapley values from game theory
  - Fair attribution of prediction to features
  - Guarantees: local accuracy, consistency, missingness

Types:
  - KernelSHAP: Model-agnostic, slower
  - TreeSHAP: For tree models, fast
  - DeepSHAP: For neural networks

Usage:
  f(x) = base_value + sum(SHAP_values)

LIME (Local Interpretable Model-agnostic Explanations):

Approach:
  1. Perturb instance
  2. Get predictions
  3. Fit simple model locally
  4. Use simple model to explain

Pros/Cons:

SHAP:
  + Theoretically grounded
  + Consistent
  - Can be slow

LIME:
  + Fast
  + Intuitive
  - Unstable explanations
  - Sensitive to perturbations
""")

shap_lime()


class SimpleSHAP:
    """Simplified SHAP implementation"""
    def __init__(self, model, X_background: np.ndarray):
        self.model = model
        self.background = X_background
        self.base_value = np.mean(model.predict_proba(X_background)[:, 1])

    def explain(self, instance: np.ndarray,
                num_samples: int = 100) -> Dict[str, float]:
        """Compute SHAP values for instance"""
        n_features = len(instance)
        shap_values = np.zeros(n_features)

        for _ in range(num_samples):
            # Random permutation
            perm = np.random.permutation(n_features)

            # Compute marginal contributions
            x_before = self.background[np.random.randint(len(self.background))].copy()
            x_after = x_before.copy()

            for idx in perm:
                x_after[idx] = instance[idx]

                # Marginal contribution
                pred_before = self.model.predict_proba([x_before])[0, 1]
                pred_after = self.model.predict_proba([x_after])[0, 1]
                shap_values[idx] += (pred_after - pred_before)

                x_before[idx] = instance[idx]

        return shap_values / num_samples


class SimpleLIME:
    """Simplified LIME implementation"""
    def __init__(self, model, feature_names: List[str]):
        self.model = model
        self.feature_names = feature_names

    def explain(self, instance: np.ndarray,
                num_samples: int = 1000,
                num_features: int = 5) -> Dict:
        """Generate LIME explanation"""
        from sklearn.linear_model import Ridge

        # Generate perturbations
        perturbations = self._generate_perturbations(instance, num_samples)

        # Get predictions
        predictions = self.model.predict_proba(perturbations)[:, 1]

        # Compute distances
        distances = np.sqrt(np.sum((perturbations - instance) ** 2, axis=1))
        weights = np.exp(-distances / distances.std())

        # Fit weighted linear model
        lime_model = Ridge(alpha=1.0)
        lime_model.fit(perturbations, predictions, sample_weight=weights)

        # Get top features
        importance = np.abs(lime_model.coef_)
        top_indices = np.argsort(-importance)[:num_features]

        return {
            'intercept': lime_model.intercept_,
            'features': {
                self.feature_names[i]: lime_model.coef_[i]
                for i in top_indices
            },
            'prediction': self.model.predict_proba([instance])[0, 1]
        }

    def _generate_perturbations(self, instance: np.ndarray,
                                 num_samples: int) -> np.ndarray:
        """Generate perturbed samples around instance"""
        perturbations = np.tile(instance, (num_samples, 1))
        noise = np.random.normal(0, 0.1, perturbations.shape)
        return perturbations + noise

Counterfactual Explanations

PYTHON
def counterfactuals():
    """Counterfactual explanations"""
    print("\nCOUNTERFACTUAL EXPLANATIONS")
    print("=" * 60)

    print("""
What are Counterfactuals?

"If X had been different, the outcome would be Y"

Example:
  Original: Loan denied
  Counterfactual: "If your income were $5K higher,
                   you would be approved"

Properties of Good Counterfactuals:

1. Validity:
   - Actually changes prediction

2. Proximity:
   - Minimal change from original

3. Sparsity:
   - Few features changed

4. Plausibility:
   - Realistic values

5. Actionability:
   - User can actually change

Use Cases:
  - Explain rejections
  - Provide actionable feedback
  - Debug model decisions
  - Regulatory compliance
""")

counterfactuals()


class CounterfactualExplainer:
    """Generate counterfactual explanations"""
    def __init__(self, model, feature_names: List[str],
                 feature_ranges: Dict[str, tuple]):
        self.model = model
        self.feature_names = feature_names
        self.feature_ranges = feature_ranges

    def find_counterfactual(self, instance: np.ndarray,
                            target_class: int,
                            max_iter: int = 1000) -> Dict:
        """Find counterfactual using gradient descent"""
        counterfactual = instance.copy().astype(float)
        learning_rate = 0.01

        for iteration in range(max_iter):
            # Get prediction
            pred = self.model.predict_proba([counterfactual])[0]

            # Check if target reached
            if np.argmax(pred) == target_class:
                break

            # Compute gradient (numerical)
            gradient = self._compute_gradient(
                counterfactual, target_class
            )

            # Update counterfactual
            counterfactual += learning_rate * gradient

            # Clip to valid ranges
            counterfactual = self._clip_to_ranges(counterfactual)

        return {
            'original': instance,
            'counterfactual': counterfactual,
            'changes': self._get_changes(instance, counterfactual),
            'success': np.argmax(
                self.model.predict_proba([counterfactual])[0]
            ) == target_class
        }

    def _compute_gradient(self, x: np.ndarray, target: int) -> np.ndarray:
        """Compute numerical gradient"""
        gradient = np.zeros_like(x)
        eps = 1e-4

        for i in range(len(x)):
            x_plus = x.copy()
            x_plus[i] += eps
            x_minus = x.copy()
            x_minus[i] -= eps

            pred_plus = self.model.predict_proba([x_plus])[0, target]
            pred_minus = self.model.predict_proba([x_minus])[0, target]

            gradient[i] = (pred_plus - pred_minus) / (2 * eps)

        return gradient

    def _clip_to_ranges(self, x: np.ndarray) -> np.ndarray:
        """Clip values to valid ranges"""
        for i, name in enumerate(self.feature_names):
            if name in self.feature_ranges:
                low, high = self.feature_ranges[name]
                x[i] = np.clip(x[i], low, high)
        return x

    def _get_changes(self, original: np.ndarray,
                      counterfactual: np.ndarray) -> Dict:
        """Get features that changed"""
        changes = {}
        for i, name in enumerate(self.feature_names):
            if abs(original[i] - counterfactual[i]) > 0.01:
                changes[name] = {
                    'from': original[i],
                    'to': counterfactual[i],
                    'change': counterfactual[i] - original[i]
                }
        return changes

Model Cards and Documentation

PYTHON
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

def model_documentation():
    """Model documentation practices"""
    print("\nMODEL DOCUMENTATION")
    print("=" * 60)

    print("""
Model Cards:

Standardized documentation including:
  - Model details
  - Intended use
  - Metrics
  - Limitations
  - Ethical considerations

Sections:

1. Model Details:
   - Developer
   - Model type
   - Training data

2. Intended Use:
   - Primary use cases
   - Out-of-scope uses
   - Users

3. Factors:
   - Relevant attributes
   - Evaluation factors

4. Metrics:
   - Performance measures
   - Decision thresholds
   - Variation across groups

5. Ethical Considerations:
   - Risks and harms
   - Mitigations

6. Caveats and Recommendations:
   - Known limitations
   - Recommendations
""")

model_documentation()


@dataclass
class ModelCard:
    """Model card for documentation"""
    # Model details
    model_name: str
    model_version: str
    model_type: str
    developers: List[str]
    model_date: datetime

    # Intended use
    primary_uses: List[str]
    out_of_scope_uses: List[str]
    users: List[str]

    # Training
    training_data: str
    training_procedure: str
    preprocessing: str

    # Evaluation
    evaluation_data: str
    metrics: Dict[str, float]
    group_metrics: Dict[str, Dict[str, float]]

    # Ethics
    ethical_considerations: List[str]
    limitations: List[str]
    recommendations: List[str]

    def to_markdown(self) -> str:
        """Generate markdown model card"""
        md = f"""# Model Card: {self.model_name}

## Model Details
- **Version**: {self.model_version}
- **Type**: {self.model_type}
- **Developers**: {', '.join(self.developers)}
- **Date**: {self.model_date.strftime('%Y-%m-%d')}

## Intended Use
### Primary Uses
{chr(10).join('- ' + use for use in self.primary_uses)}

### Out-of-Scope Uses
{chr(10).join('- ' + use for use in self.out_of_scope_uses)}

## Training Data
{self.training_data}

## Evaluation
### Overall Metrics
{chr(10).join(f'- **{k}**: {v:.4f}' for k, v in self.metrics.items())}

### Performance by Group
"""
        for group, metrics in self.group_metrics.items():
            md += f"\n**{group}**:\n"
            md += chr(10).join(f'- {k}: {v:.4f}' for k, v in metrics.items())

        md += f"""

## Ethical Considerations
{chr(10).join('- ' + c for c in self.ethical_considerations)}

## Limitations
{chr(10).join('- ' + l for l in self.limitations)}

## Recommendations
{chr(10).join('- ' + r for r in self.recommendations)}
"""
        return md


# Example model card
def model_card_example():
    card = ModelCard(
        model_name="Credit Risk Classifier",
        model_version="2.0.0",
        model_type="Gradient Boosted Trees",
        developers=["ML Team"],
        model_date=datetime.now(),
        primary_uses=["Loan approval decisions", "Risk assessment"],
        out_of_scope_uses=["Employment decisions", "Insurance pricing"],
        users=["Loan officers", "Automated systems"],
        training_data="Historical loan applications (2018-2023)",
        training_procedure="XGBoost with hyperparameter tuning",
        preprocessing="Standard scaling, categorical encoding",
        evaluation_data="Held-out test set (2023)",
        metrics={'accuracy': 0.85, 'auc': 0.92, 'f1': 0.78},
        group_metrics={
            'age_under_25': {'accuracy': 0.82, 'fpr': 0.12},
            'age_over_65': {'accuracy': 0.87, 'fpr': 0.08}
        },
        ethical_considerations=[
            "Model may perpetuate historical lending biases",
            "Age and zip code may serve as proxies for protected attributes"
        ],
        limitations=[
            "Performance degrades for applicants with thin credit files",
            "Not validated for business loans"
        ],
        recommendations=[
            "Human review required for borderline cases",
            "Regular fairness audits recommended"
        ]
    )
    print(card.to_markdown()[:500] + "...")

model_card_example()

Summary

| Method | Type | Use Case | |--------|------|----------| | Permutation Importance | Global | Feature ranking | | SHAP | Both | Detailed attribution | | LIME | Local | Instance explanation | | Counterfactuals | Local | Actionable feedback | | Model Cards | Documentation | Transparency |

Key takeaways:

  • Explainability builds trust
  • Use multiple explanation methods
  • Global and local explanations serve different needs
  • Counterfactuals provide actionable insights
  • Document models thoroughly with model cards
  • Consider your audience when explaining

27.4 Privacy and Security in ML Advanced

Privacy and Security in ML

Machine learning systems handle sensitive data and can be vulnerable to attacks. This section covers privacy-preserving techniques and security considerations for ML systems.

Privacy in Machine Learning

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
import numpy as np

def privacy_overview():
    """Privacy in machine learning"""
    print("PRIVACY IN ML")
    print("=" * 60)

    print("""
Privacy Concerns in ML:

1. Training Data Privacy:
   - Models may memorize data
   - Sensitive information in training
   - Data breaches

2. Model Privacy:
   - Model inversion attacks
   - Membership inference
   - Model stealing

3. Inference Privacy:
   - User queries exposed
   - Prediction leakage
   - Behavioral tracking

Regulations:

GDPR (EU):
  - Right to be forgotten
  - Data minimization
  - Purpose limitation
  - Right to explanation

CCPA (California):
  - Know what data collected
  - Delete personal data
  - Opt-out of sale

HIPAA (Healthcare):
  - Protected health information
  - Minimum necessary standard
  - Security requirements
""")

privacy_overview()


class PrivacyTechnique(Enum):
    DIFFERENTIAL_PRIVACY = "differential_privacy"
    FEDERATED_LEARNING = "federated_learning"
    SECURE_COMPUTATION = "secure_computation"
    ANONYMIZATION = "anonymization"
    ENCRYPTION = "encryption"


def privacy_techniques():
    """Privacy-preserving techniques"""
    print("\nPRIVACY TECHNIQUES")
    print("-" * 50)

    print("""
1. Differential Privacy:
   - Add noise to data/model
   - Mathematical privacy guarantee
   - Epsilon-delta privacy bound

2. Federated Learning:
   - Train on decentralized data
   - Data stays on device
   - Only gradients shared

3. Secure Computation:
   - Encrypted computation
   - Multi-party computation
   - Homomorphic encryption

4. Data Anonymization:
   - Remove identifiers
   - K-anonymity, L-diversity
   - Generalization

5. Synthetic Data:
   - Generate fake realistic data
   - Preserves statistical properties
   - No real individuals
""")

privacy_techniques()

Differential Privacy

PYTHON
import numpy as np
from typing import Callable

def differential_privacy():
    """Differential privacy concepts"""
    print("\nDIFFERENTIAL PRIVACY")
    print("=" * 60)

    print("""
Differential Privacy (DP):

Definition:
  An algorithm A is ε-differentially private if:
  P(A(D) ∈ S) ≤ e^ε × P(A(D') ∈ S)

  Where D and D' differ by one record.

Intuition:
  - One person's presence doesn't change output much
  - Provides plausible deniability
  - Quantifiable privacy guarantee

Key Parameters:
  - Epsilon (ε): Privacy budget
    - Smaller = more privacy
    - Typical: 0.1 to 10

  - Delta (δ): Failure probability
    - Probability of violating ε
    - Typically very small (1/n²)

Mechanisms:
  - Laplace mechanism (numerical)
  - Gaussian mechanism (numerical)
  - Exponential mechanism (categorical)
  - Randomized response
""")

differential_privacy()


class DifferentialPrivacy:
    """Differential privacy mechanisms"""

    @staticmethod
    def laplace_mechanism(value: float, sensitivity: float,
                          epsilon: float) -> float:
        """Add Laplace noise for DP"""
        scale = sensitivity / epsilon
        noise = np.random.laplace(0, scale)
        return value + noise

    @staticmethod
    def gaussian_mechanism(value: float, sensitivity: float,
                           epsilon: float, delta: float) -> float:
        """Add Gaussian noise for DP"""
        sigma = sensitivity * np.sqrt(2 * np.log(1.25 / delta)) / epsilon
        noise = np.random.normal(0, sigma)
        return value + noise

    @staticmethod
    def randomized_response(true_value: bool,
                            epsilon: float) -> bool:
        """Randomized response for binary data"""
        p = np.exp(epsilon) / (1 + np.exp(epsilon))
        if np.random.random() < p:
            return true_value
        else:
            return not true_value


class DPQuery:
    """Differentially private queries"""
    def __init__(self, epsilon: float, delta: float = 1e-5):
        self.epsilon = epsilon
        self.delta = delta
        self.budget_used = 0.0

    def private_count(self, data: np.ndarray,
                      predicate: Callable) -> float:
        """DP count query"""
        true_count = sum(1 for x in data if predicate(x))
        # Sensitivity is 1 for count
        noisy_count = DifferentialPrivacy.laplace_mechanism(
            true_count, sensitivity=1, epsilon=self.epsilon
        )
        self.budget_used += self.epsilon
        return max(0, noisy_count)  # Count can't be negative

    def private_mean(self, data: np.ndarray,
                     lower: float, upper: float) -> float:
        """DP mean query"""
        true_mean = np.mean(data)
        # Sensitivity for mean is (upper - lower) / n
        sensitivity = (upper - lower) / len(data)
        noisy_mean = DifferentialPrivacy.laplace_mechanism(
            true_mean, sensitivity=sensitivity, epsilon=self.epsilon
        )
        self.budget_used += self.epsilon
        return np.clip(noisy_mean, lower, upper)

    def private_histogram(self, data: np.ndarray,
                          num_bins: int) -> np.ndarray:
        """DP histogram"""
        true_hist, _ = np.histogram(data, bins=num_bins)
        # Add noise to each bin (sensitivity = 1)
        noisy_hist = np.array([
            DifferentialPrivacy.laplace_mechanism(
                count, sensitivity=1, epsilon=self.epsilon / num_bins
            )
            for count in true_hist
        ])
        self.budget_used += self.epsilon
        return np.maximum(0, noisy_hist)


# Example
def dp_example():
    """Differential privacy example"""
    data = np.random.normal(50000, 15000, 1000)  # Salaries

    dp_query = DPQuery(epsilon=1.0)

    # Private mean
    private_mean = dp_query.private_mean(data, 0, 200000)
    true_mean = np.mean(data)
    print(f"True mean: {true_mean:.2f}")
    print(f"Private mean: {private_mean:.2f}")
    print(f"Budget used: {dp_query.budget_used}")

dp_example()

Federated Learning

PYTHON
from typing import Dict, List
import numpy as np

def federated_learning():
    """Federated learning concepts"""
    print("\nFEDERATED LEARNING")
    print("=" * 60)

    print("""
Federated Learning:

Definition:
  Train ML models across decentralized devices
  without sharing raw data.

Process:
  1. Server sends model to clients
  2. Clients train on local data
  3. Clients send updates to server
  4. Server aggregates updates
  5. Repeat

Types:
  - Horizontal FL: Same features, different samples
  - Vertical FL: Different features, same samples
  - Transfer FL: Different features and samples

Challenges:
  - Non-IID data
  - Communication efficiency
  - System heterogeneity
  - Privacy attacks still possible

Aggregation Methods:
  - FedAvg: Average model weights
  - FedProx: Proximal term for heterogeneity
  - Secure aggregation: Encrypted updates
""")

federated_learning()


class FederatedClient:
    """Federated learning client"""
    def __init__(self, client_id: str, local_data: np.ndarray,
                 local_labels: np.ndarray):
        self.client_id = client_id
        self.data = local_data
        self.labels = local_labels
        self.model_weights = None

    def receive_model(self, weights: np.ndarray):
        """Receive global model"""
        self.model_weights = weights.copy()

    def train_local(self, epochs: int = 1,
                    learning_rate: float = 0.01) -> np.ndarray:
        """Train on local data"""
        for _ in range(epochs):
            # Simple gradient descent
            predictions = self.data @ self.model_weights
            errors = predictions - self.labels
            gradient = self.data.T @ errors / len(self.data)
            self.model_weights -= learning_rate * gradient

        return self.model_weights

    def get_update(self) -> np.ndarray:
        """Get model update (weights)"""
        return self.model_weights


class FederatedServer:
    """Federated learning server"""
    def __init__(self, num_features: int):
        self.global_weights = np.random.randn(num_features) * 0.01
        self.clients: List[FederatedClient] = []

    def register_client(self, client: FederatedClient):
        """Register client"""
        self.clients.append(client)

    def broadcast_model(self):
        """Send model to all clients"""
        for client in self.clients:
            client.receive_model(self.global_weights)

    def aggregate_fedavg(self, client_weights: List[np.ndarray],
                         client_sizes: List[int]) -> np.ndarray:
        """FedAvg aggregation"""
        total_size = sum(client_sizes)
        weighted_sum = np.zeros_like(self.global_weights)

        for weights, size in zip(client_weights, client_sizes):
            weighted_sum += weights * (size / total_size)

        return weighted_sum

    def train_round(self) -> float:
        """Execute one training round"""
        # Broadcast current model
        self.broadcast_model()

        # Clients train locally
        updates = []
        sizes = []
        for client in self.clients:
            weights = client.train_local()
            updates.append(weights)
            sizes.append(len(client.data))

        # Aggregate updates
        self.global_weights = self.aggregate_fedavg(updates, sizes)

        return self._evaluate()

    def _evaluate(self) -> float:
        """Evaluate global model"""
        # Simplified evaluation
        return np.linalg.norm(self.global_weights)


# Example
def federated_example():
    """Federated learning example"""
    # Create clients with local data
    np.random.seed(42)
    clients = []

    for i in range(3):
        X = np.random.randn(100, 5)
        y = X @ np.array([1, 2, 3, 4, 5]) + np.random.randn(100) * 0.1
        client = FederatedClient(f"client_{i}", X, y)
        clients.append(client)

    # Create server
    server = FederatedServer(num_features=5)
    for client in clients:
        server.register_client(client)

    # Train
    for round_num in range(5):
        loss = server.train_round()
        print(f"Round {round_num + 1}: {loss:.4f}")

federated_example()

ML Security Attacks

PYTHON
from typing import Dict, List
import numpy as np

def ml_security():
    """ML security threats"""
    print("\nML SECURITY")
    print("=" * 60)

    print("""
Attack Types:

1. Data Poisoning:
   - Inject malicious training data
   - Corrupt model behavior
   - Backdoor attacks

2. Model Inversion:
   - Reconstruct training data
   - Extract sensitive features
   - Privacy breach

3. Membership Inference:
   - Determine if sample was in training
   - Privacy attack
   - Exploits overfitting

4. Model Extraction:
   - Steal model via queries
   - Intellectual property theft
   - Create surrogate model

5. Adversarial Examples:
   - Inputs designed to fool model
   - Small perturbations
   - Transferable across models

6. Prompt Injection:
   - Manipulate LLM behavior
   - Bypass safety measures
   - Extract sensitive info
""")

ml_security()


class MembershipInference:
    """Membership inference attack"""
    def __init__(self, target_model, shadow_model):
        self.target = target_model
        self.shadow = shadow_model
        self.attack_model = None

    def train_attack(self, shadow_train_data: np.ndarray,
                     shadow_test_data: np.ndarray):
        """Train attack model on shadow model"""
        from sklearn.linear_model import LogisticRegression

        # Get shadow model confidences
        train_confs = self._get_confidences(self.shadow, shadow_train_data)
        test_confs = self._get_confidences(self.shadow, shadow_test_data)

        # Create attack dataset
        X_attack = np.vstack([train_confs, test_confs])
        y_attack = np.array([1] * len(train_confs) + [0] * len(test_confs))

        # Train attack model
        self.attack_model = LogisticRegression()
        self.attack_model.fit(X_attack, y_attack)

    def attack(self, samples: np.ndarray) -> np.ndarray:
        """Perform membership inference"""
        confs = self._get_confidences(self.target, samples)
        return self.attack_model.predict_proba(confs)[:, 1]

    def _get_confidences(self, model, data: np.ndarray) -> np.ndarray:
        """Get model confidence scores"""
        probs = model.predict_proba(data)
        # Features: max prob, entropy, etc.
        max_prob = np.max(probs, axis=1, keepdims=True)
        entropy = -np.sum(probs * np.log(probs + 1e-10), axis=1, keepdims=True)
        return np.hstack([max_prob, entropy, probs])


class AdversarialAttack:
    """Generate adversarial examples"""

    @staticmethod
    def fgsm(model, x: np.ndarray, y: int,
             epsilon: float = 0.1) -> np.ndarray:
        """Fast Gradient Sign Method"""
        # Get gradient
        gradient = AdversarialAttack._compute_gradient(model, x, y)

        # Create perturbation
        perturbation = epsilon * np.sign(gradient)

        # Apply perturbation
        x_adv = x + perturbation
        return np.clip(x_adv, 0, 1)

    @staticmethod
    def pgd(model, x: np.ndarray, y: int,
            epsilon: float = 0.1, alpha: float = 0.01,
            num_iter: int = 40) -> np.ndarray:
        """Projected Gradient Descent"""
        x_adv = x.copy()

        for _ in range(num_iter):
            gradient = AdversarialAttack._compute_gradient(model, x_adv, y)
            x_adv = x_adv + alpha * np.sign(gradient)
            # Project back to epsilon ball
            x_adv = np.clip(x_adv, x - epsilon, x + epsilon)
            x_adv = np.clip(x_adv, 0, 1)

        return x_adv

    @staticmethod
    def _compute_gradient(model, x: np.ndarray, y: int) -> np.ndarray:
        """Compute gradient of loss w.r.t. input"""
        eps = 1e-4
        gradient = np.zeros_like(x)

        for i in range(len(x)):
            x_plus = x.copy()
            x_plus[i] += eps
            x_minus = x.copy()
            x_minus[i] -= eps

            # Loss is negative log probability of true class
            loss_plus = -np.log(model.predict_proba([x_plus])[0, y] + 1e-10)
            loss_minus = -np.log(model.predict_proba([x_minus])[0, y] + 1e-10)

            gradient[i] = (loss_plus - loss_minus) / (2 * eps)

        return gradient

Security Best Practices

PYTHON
def security_practices():
    """ML security best practices"""
    print("\nSECURITY BEST PRACTICES")
    print("=" * 60)

    print("""
Defense Strategies:

1. Data Security:
   - Encrypt data at rest and in transit
   - Access controls
   - Audit logging
   - Secure deletion

2. Model Security:
   - Model access controls
   - Rate limiting queries
   - Monitor for extraction
   - Watermarking

3. Training Security:
   - Data validation
   - Anomaly detection
   - Robust training methods
   - Certified defenses

4. Inference Security:
   - Input validation
   - Adversarial detection
   - Uncertainty estimation
   - Human review for edge cases

5. Infrastructure:
   - Secure compute environment
   - Network isolation
   - Regular updates
   - Incident response plan
""")

security_practices()


@dataclass
class SecurityChecklist:
    """ML security checklist"""
    data_encrypted: bool = False
    access_controls: bool = False
    audit_logging: bool = False
    rate_limiting: bool = False
    input_validation: bool = False
    adversarial_defense: bool = False
    model_monitoring: bool = False
    incident_response: bool = False

    def score(self) -> float:
        """Calculate security score"""
        checks = [
            self.data_encrypted,
            self.access_controls,
            self.audit_logging,
            self.rate_limiting,
            self.input_validation,
            self.adversarial_defense,
            self.model_monitoring,
            self.incident_response
        ]
        return sum(checks) / len(checks)

    def report(self) -> str:
        """Generate security report"""
        score = self.score()
        status = "Good" if score >= 0.8 else "Needs Improvement" if score >= 0.5 else "Critical"

        return f"""
Security Assessment: {status} ({score:.0%})

✓ Data Encryption: {'Yes' if self.data_encrypted else 'No'}
✓ Access Controls: {'Yes' if self.access_controls else 'No'}
✓ Audit Logging: {'Yes' if self.audit_logging else 'No'}
✓ Rate Limiting: {'Yes' if self.rate_limiting else 'No'}
✓ Input Validation: {'Yes' if self.input_validation else 'No'}
✓ Adversarial Defense: {'Yes' if self.adversarial_defense else 'No'}
✓ Model Monitoring: {'Yes' if self.model_monitoring else 'No'}
✓ Incident Response: {'Yes' if self.incident_response else 'No'}
"""

Summary

| Technique | Protects Against | Trade-off | |-----------|-----------------|-----------| | Differential Privacy | Data extraction | Accuracy loss | | Federated Learning | Data exposure | Communication cost | | Adversarial Training | Adversarial examples | Training cost | | Rate Limiting | Model extraction | User experience |

Key takeaways:

  • Privacy and security are critical in ML
  • Differential privacy provides mathematical guarantees
  • Federated learning keeps data decentralized
  • Multiple attack vectors exist
  • Defense in depth is essential
  • Regular security audits are important

27.5 Responsible AI Practices Intermediate

Responsible AI Practices

Building AI responsibly requires integrating ethics throughout the development lifecycle. This section covers governance frameworks, implementation practices, and organizational approaches to responsible AI.

Responsible AI Framework

PYTHON
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

def responsible_ai_overview():
    """Responsible AI framework"""
    print("RESPONSIBLE AI")
    print("=" * 60)

    print("""
Responsible AI Principles:

1. Human-Centered:
   - AI augments humans
   - Human oversight maintained
   - User well-being priority

2. Inclusive:
   - Accessible to all
   - Diverse perspectives
   - Reduce digital divide

3. Accountable:
   - Clear ownership
   - Audit trails
   - Redress mechanisms

4. Transparent:
   - Explainable decisions
   - Disclosed AI usage
   - Open about limitations

5. Safe and Secure:
   - Reliable operation
   - Protected from misuse
   - Privacy preserved

6. Environmentally Sustainable:
   - Energy efficiency
   - Carbon footprint awareness
   - Resource optimization

Implementation Stages:
  Design → Develop → Deploy → Monitor → Retire
""")

responsible_ai_overview()


class RAIPrinciple(Enum):
    HUMAN_CENTERED = "human_centered"
    INCLUSIVE = "inclusive"
    ACCOUNTABLE = "accountable"
    TRANSPARENT = "transparent"
    SAFE = "safe"
    SUSTAINABLE = "sustainable"


@dataclass
class RAIAssessment:
    """Responsible AI assessment"""
    project_name: str
    assessment_date: datetime
    principles: Dict[RAIPrinciple, float]  # Score 0-1
    risks: List[Dict]
    mitigations: List[Dict]
    recommendations: List[str]
    overall_score: float

    def is_approved(self, threshold: float = 0.7) -> bool:
        return self.overall_score >= threshold

AI Governance

PYTHON
from typing import Dict, List
from dataclasses import dataclass

def ai_governance():
    """AI governance frameworks"""
    print("\nAI GOVERNANCE")
    print("=" * 60)

    print("""
Governance Components:

1. Policies:
   - AI use policies
   - Data governance
   - Model lifecycle management
   - Acceptable use guidelines

2. Processes:
   - Ethics review
   - Risk assessment
   - Approval workflows
   - Incident response

3. People:
   - AI ethics board
   - Data stewards
   - Model owners
   - Training programs

4. Technology:
   - Monitoring tools
   - Audit systems
   - Documentation platforms
   - Compliance automation

Governance Levels:

Organizational:
  - AI strategy alignment
  - Resource allocation
  - Culture and training

Project:
  - Use case evaluation
  - Risk assessment
  - Approval gates

Operational:
  - Monitoring
  - Incident response
  - Continuous improvement
""")

ai_governance()


@dataclass
class AIPolicy:
    """AI governance policy"""
    name: str
    version: str
    effective_date: datetime
    scope: str
    requirements: List[str]
    prohibited_uses: List[str]
    review_frequency: str


class AIGovernance:
    """AI governance framework"""
    def __init__(self, organization: str):
        self.organization = organization
        self.policies: Dict[str, AIPolicy] = {}
        self.risk_register: List[Dict] = []
        self.approved_uses: List[Dict] = []

    def add_policy(self, policy: AIPolicy):
        """Add governance policy"""
        self.policies[policy.name] = policy

    def register_risk(self, risk: Dict):
        """Register AI risk"""
        risk['registered_at'] = datetime.now()
        risk['status'] = 'open'
        self.risk_register.append(risk)

    def approve_use_case(self, use_case: Dict, approver: str):
        """Approve AI use case"""
        use_case['approved_by'] = approver
        use_case['approved_at'] = datetime.now()
        self.approved_uses.append(use_case)

    def check_compliance(self, project: Dict) -> Dict:
        """Check project compliance with policies"""
        violations = []

        for policy_name, policy in self.policies.items():
            # Check prohibited uses
            for prohibited in policy.prohibited_uses:
                if prohibited.lower() in project.get('description', '').lower():
                    violations.append({
                        'policy': policy_name,
                        'violation': prohibited
                    })

        return {
            'compliant': len(violations) == 0,
            'violations': violations
        }


# Example governance setup
def governance_example():
    """Example AI governance setup"""
    gov = AIGovernance("Example Corp")

    # Add policy
    policy = AIPolicy(
        name="AI Ethics Policy",
        version="1.0",
        effective_date=datetime.now(),
        scope="All AI/ML projects",
        requirements=[
            "Ethics review required for high-risk AI",
            "Bias testing mandatory before deployment",
            "Human oversight for automated decisions"
        ],
        prohibited_uses=[
            "Social scoring",
            "Emotion recognition for hiring",
            "Autonomous weapons"
        ],
        review_frequency="annual"
    )
    gov.add_policy(policy)

    # Check compliance
    project = {
        'name': 'Customer Churn Prediction',
        'description': 'Predict customer churn for retention'
    }

    result = gov.check_compliance(project)
    print(f"Compliance: {result['compliant']}")

governance_example()

Risk Assessment

PYTHON
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum

def risk_assessment():
    """AI risk assessment"""
    print("\nRISK ASSESSMENT")
    print("=" * 60)

    print("""
Risk Categories:

1. Technical Risks:
   - Model failure
   - Data quality issues
   - Security vulnerabilities
   - Performance degradation

2. Ethical Risks:
   - Bias and discrimination
   - Privacy violations
   - Lack of transparency
   - Manipulation

3. Social Risks:
   - Job displacement
   - Digital divide
   - Misinformation
   - Dependency

4. Legal/Regulatory Risks:
   - Non-compliance
   - Liability
   - Intellectual property
   - Cross-border issues

5. Reputational Risks:
   - Public backlash
   - Trust erosion
   - Brand damage

Risk Assessment Process:
  1. Identify risks
  2. Assess likelihood and impact
  3. Prioritize risks
  4. Develop mitigations
  5. Monitor and review
""")

risk_assessment()


class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class AIRisk:
    """AI system risk"""
    name: str
    category: str
    description: str
    likelihood: int  # 1-5
    impact: int  # 1-5
    mitigation: str
    owner: str
    status: str = "open"

    @property
    def score(self) -> int:
        return self.likelihood * self.impact

    @property
    def level(self) -> RiskLevel:
        if self.score >= 20:
            return RiskLevel.CRITICAL
        elif self.score >= 12:
            return RiskLevel.HIGH
        elif self.score >= 6:
            return RiskLevel.MEDIUM
        return RiskLevel.LOW


class RiskAssessment:
    """AI risk assessment framework"""
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.risks: List[AIRisk] = []

    def identify_risks(self, use_case: str, data_types: List[str],
                      affected_groups: List[str]) -> List[AIRisk]:
        """Identify potential risks"""
        identified = []

        # Check for high-risk indicators
        if 'personal' in data_types or 'sensitive' in data_types:
            identified.append(AIRisk(
                name="Privacy Risk",
                category="ethical",
                description="Processing of personal/sensitive data",
                likelihood=4,
                impact=4,
                mitigation="Implement privacy controls and consent",
                owner="Data Protection Officer"
            ))

        if 'decisions' in use_case.lower() or 'automated' in use_case.lower():
            identified.append(AIRisk(
                name="Bias Risk",
                category="ethical",
                description="Automated decisions may embed bias",
                likelihood=3,
                impact=5,
                mitigation="Regular bias audits and fairness testing",
                owner="ML Ethics Team"
            ))

        if any('vulnerable' in g.lower() for g in affected_groups):
            identified.append(AIRisk(
                name="Vulnerable Population Risk",
                category="social",
                description="Impact on vulnerable populations",
                likelihood=3,
                impact=5,
                mitigation="Enhanced protections and human review",
                owner="Ethics Board"
            ))

        self.risks.extend(identified)
        return identified

    def prioritize_risks(self) -> List[AIRisk]:
        """Prioritize risks by score"""
        return sorted(self.risks, key=lambda r: -r.score)

    def generate_report(self) -> Dict:
        """Generate risk report"""
        prioritized = self.prioritize_risks()

        return {
            'project': self.project_name,
            'total_risks': len(self.risks),
            'critical_risks': sum(1 for r in self.risks if r.level == RiskLevel.CRITICAL),
            'high_risks': sum(1 for r in self.risks if r.level == RiskLevel.HIGH),
            'top_risks': [
                {
                    'name': r.name,
                    'level': r.level.value,
                    'score': r.score,
                    'mitigation': r.mitigation
                }
                for r in prioritized[:5]
            ]
        }


# Example
def risk_example():
    """Risk assessment example"""
    assessment = RiskAssessment("Loan Approval System")

    risks = assessment.identify_risks(
        use_case="Automated loan decisions",
        data_types=["personal", "financial", "sensitive"],
        affected_groups=["low-income applicants", "vulnerable populations"]
    )

    report = assessment.generate_report()
    print("Risk Report:")
    print(f"  Total: {report['total_risks']}")
    print(f"  Critical: {report['critical_risks']}")
    print(f"  High: {report['high_risks']}")

risk_example()

Implementation Best Practices

PYTHON
def implementation_practices():
    """Responsible AI implementation"""
    print("\nIMPLEMENTATION PRACTICES")
    print("=" * 60)

    print("""
Development Practices:

1. Design Phase:
   - Stakeholder mapping
   - Ethics review
   - Risk assessment
   - Inclusive design

2. Data Phase:
   - Data documentation
   - Consent verification
   - Bias analysis
   - Privacy assessment

3. Model Phase:
   - Fairness testing
   - Explainability methods
   - Performance across groups
   - Robustness testing

4. Deployment Phase:
   - Human oversight setup
   - Monitoring configuration
   - Feedback mechanisms
   - Incident response

5. Operational Phase:
   - Continuous monitoring
   - Regular audits
   - User feedback
   - Model updates

Documentation Requirements:
  - Data cards
  - Model cards
  - Impact assessments
  - Audit reports
""")

implementation_practices()


class ResponsibleAIChecklist:
    """Responsible AI implementation checklist"""
    def __init__(self, project_name: str):
        self.project_name = project_name
        self.checks = {
            'design': {
                'stakeholder_analysis': False,
                'ethics_review': False,
                'risk_assessment': False,
                'inclusive_design': False
            },
            'data': {
                'data_documentation': False,
                'consent_verified': False,
                'bias_analysis': False,
                'privacy_assessment': False
            },
            'model': {
                'fairness_testing': False,
                'explainability': False,
                'group_performance': False,
                'robustness_testing': False
            },
            'deployment': {
                'human_oversight': False,
                'monitoring_setup': False,
                'feedback_mechanism': False,
                'incident_response': False
            }
        }

    def complete_check(self, phase: str, check: str):
        """Mark check as complete"""
        if phase in self.checks and check in self.checks[phase]:
            self.checks[phase][check] = True

    def phase_score(self, phase: str) -> float:
        """Get phase completion score"""
        if phase not in self.checks:
            return 0.0
        checks = self.checks[phase]
        return sum(checks.values()) / len(checks)

    def overall_score(self) -> float:
        """Get overall completion score"""
        total = sum(len(c) for c in self.checks.values())
        completed = sum(
            sum(checks.values())
            for checks in self.checks.values()
        )
        return completed / total

    def report(self) -> str:
        """Generate checklist report"""
        report = f"Responsible AI Checklist: {self.project_name}\n"
        report += "=" * 50 + "\n\n"

        for phase, checks in self.checks.items():
            score = self.phase_score(phase)
            report += f"{phase.upper()} ({score:.0%})\n"
            for check, completed in checks.items():
                status = "✓" if completed else "○"
                report += f"  {status} {check.replace('_', ' ').title()}\n"
            report += "\n"

        report += f"Overall: {self.overall_score():.0%}"
        return report


# Example
def checklist_example():
    """Checklist example"""
    checklist = ResponsibleAIChecklist("Customer Segmentation")

    # Complete some checks
    checklist.complete_check('design', 'stakeholder_analysis')
    checklist.complete_check('design', 'ethics_review')
    checklist.complete_check('data', 'data_documentation')
    checklist.complete_check('model', 'fairness_testing')

    print(checklist.report())

checklist_example()

Building an Ethical Culture

PYTHON
def ethical_culture():
    """Building ethical AI culture"""
    print("\nBUILDING ETHICAL CULTURE")
    print("=" * 60)

    print("""
Cultural Elements:

1. Leadership Commitment:
   - Executive sponsorship
   - Resource allocation
   - Public commitment
   - Leading by example

2. Education and Training:
   - Ethics training for all
   - Technical training
   - Case studies
   - Continuous learning

3. Diverse Teams:
   - Varied backgrounds
   - Multiple perspectives
   - Inclusive hiring
   - Psychological safety

4. Open Communication:
   - Safe reporting channels
   - Ethics discussions
   - Challenge culture
   - No retaliation

5. Incentive Alignment:
   - Ethics in performance
   - Recognition for raising concerns
   - Balanced metrics
   - Long-term thinking

Practical Steps:

1. Ethics Champions:
   - Embedded in teams
   - First point of contact
   - Bridge to ethics board

2. Ethics Office Hours:
   - Regular sessions
   - Open to all
   - Confidential guidance

3. Ethics Case Library:
   - Real examples
   - Lessons learned
   - Decision frameworks

4. Retrospectives:
   - Include ethics review
   - Learn from incidents
   - Celebrate good practices
""")

ethical_culture()


@dataclass
class EthicsProgram:
    """Organization ethics program"""
    organization: str
    ethics_board: List[str]
    training_modules: List[str]
    reporting_channels: List[str]
    review_cadence: str

    def assess_maturity(self, scores: Dict[str, int]) -> Dict:
        """Assess ethics program maturity"""
        # Scores 1-5 for each dimension
        dimensions = [
            'leadership', 'training', 'diversity',
            'communication', 'incentives', 'processes'
        ]

        total = sum(scores.get(d, 0) for d in dimensions)
        max_score = len(dimensions) * 5

        maturity = total / max_score

        if maturity >= 0.8:
            level = "Advanced"
        elif maturity >= 0.6:
            level = "Established"
        elif maturity >= 0.4:
            level = "Developing"
        else:
            level = "Initial"

        return {
            'maturity_score': maturity,
            'maturity_level': level,
            'scores': scores,
            'recommendations': self._get_recommendations(scores)
        }

    def _get_recommendations(self, scores: Dict[str, int]) -> List[str]:
        """Get improvement recommendations"""
        recommendations = []
        for dim, score in scores.items():
            if score < 3:
                recommendations.append(f"Improve {dim}: score {score}/5")
        return recommendations

Summary

| Area | Key Action | Outcome | |------|------------|---------| | Governance | Establish policies | Clear guidelines | | Risk | Regular assessment | Proactive mitigation | | Process | Checklists and gates | Consistent quality | | Culture | Training and leadership | Ethical mindset |

Key takeaways:

  • Responsible AI requires organizational commitment
  • Governance provides structure and accountability
  • Risk assessment should be ongoing
  • Implementation checklists ensure consistency
  • Culture is as important as processes
  • Ethics is everyone's responsibility