Back to Blog
MLOps

MLOps Pipeline: From Model Development to Production Deployment

August 5, 2024
7 min read
By Oscar M. Cabrisses
MLOpsMachine LearningMLflowKubeflowPythonCI/CDKubernetes

MLOps Pipeline: From Model Development to Production Deployment

Machine Learning Operations (MLOps) has emerged as a critical discipline for organizations looking to operationalize their ML models at scale. This comprehensive guide walks through building a complete MLOps pipeline that ensures reproducible, reliable, and scalable machine learning deployments.

The MLOps Challenge

Traditional software development practices don't directly translate to machine learning systems due to:

  • Data Dependencies: Models are sensitive to data quality and drift
  • Experimentation: Multiple model versions and hyperparameter combinations
  • Model Lifecycle: Training, validation, deployment, and monitoring phases
  • Resource Management: Compute-intensive training and inference workloads

Architecture Overview

Our MLOps pipeline consists of several key components:

  1. Data Pipeline: Automated data ingestion and preprocessing
  2. Model Training: Scalable training infrastructure
  3. Model Registry: Centralized model versioning and metadata
  4. Deployment Pipeline: Automated model deployment and serving
  5. Monitoring: Model performance and data drift detection

Setting Up the Development Environment

Prerequisites

# Install core dependencies
pip install mlflow kubeflow-pipelines tensorflow scikit-learn pandas

# Set up MLflow tracking server
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts

Project Structure

mlops-pipeline/
├── data/
│   ├── raw/
│   ├── processed/
│   └── validation/
├── models/
├── notebooks/
├── src/
│   ├── data/
│   ├── features/
│   ├── models/
│   └── deployment/
├── pipelines/
├── monitoring/
└── infrastructure/

Data Pipeline Implementation

Data Validation

import pandas as pd
import great_expectations as ge
from great_expectations.dataset import PandasDataset

def validate_data(df: pd.DataFrame) -> bool:
    """Validate incoming data against expectations."""
    
    # Convert to Great Expectations dataset
    ge_df = PandasDataset(df)
    
    # Define expectations
    ge_df.expect_column_to_exist("feature_1")
    ge_df.expect_column_values_to_not_be_null("target")
    ge_df.expect_column_values_to_be_between("feature_2", min_value=0, max_value=100)
    
    # Validate
    results = ge_df.validate()
    return results.success

Feature Engineering Pipeline

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
import joblib

class FeatureEngineering(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.scaler = StandardScaler()
        self.encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
        
    def fit(self, X, y=None):
        numeric_features = X.select_dtypes(include=[np.number]).columns
        categorical_features = X.select_dtypes(include=['object']).columns
        
        self.scaler.fit(X[numeric_features])
        self.encoder.fit(X[categorical_features])
        return self
    
    def transform(self, X):
        numeric_features = X.select_dtypes(include=[np.number]).columns
        categorical_features = X.select_dtypes(include=['object']).columns
        
        X_numeric = self.scaler.transform(X[numeric_features])
        X_categorical = self.encoder.transform(X[categorical_features])
        
        return np.hstack([X_numeric, X_categorical])

Model Training with MLflow

Experiment Tracking

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score

def train_model(X_train, y_train, X_val, y_val, params):
    """Train model with MLflow tracking."""
    
    with mlflow.start_run():
        # Log parameters
        mlflow.log_params(params)
        
        # Train model
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)
        
        # Make predictions
        y_pred = model.predict(X_val)
        
        # Calculate metrics
        accuracy = accuracy_score(y_val, y_pred)
        precision = precision_score(y_val, y_pred, average='weighted')
        recall = recall_score(y_val, y_pred, average='weighted')
        
        # Log metrics
        mlflow.log_metrics({
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall
        })
        
        # Log model
        mlflow.sklearn.log_model(model, "model")
        
        return model, mlflow.active_run().info.run_id

Hyperparameter Optimization

from hyperopt import hp, fmin, tpe, Trials
import mlflow

def optimize_hyperparameters(X_train, y_train, X_val, y_val):
    """Optimize hyperparameters using Hyperopt and MLflow."""
    
    space = {
        'n_estimators': hp.choice('n_estimators', [50, 100, 200]),
        'max_depth': hp.choice('max_depth', [5, 10, 15, None]),
        'min_samples_split': hp.uniform('min_samples_split', 0.01, 0.2),
        'min_samples_leaf': hp.uniform('min_samples_leaf', 0.01, 0.1)
    }
    
    def objective(params):
        with mlflow.start_run(nested=True):
            model, _ = train_model(X_train, y_train, X_val, y_val, params)
            y_pred = model.predict(X_val)
            accuracy = accuracy_score(y_val, y_pred)
            return -accuracy  # Minimize negative accuracy
    
    trials = Trials()
    best = fmin(fn=objective,
                space=space,
                algo=tpe.suggest,
                max_evals=50,
                trials=trials)
    
    return best

Kubeflow Pipelines for Orchestration

Pipeline Definition

import kfp
from kfp import dsl
from kfp.components import create_component_from_func

@create_component_from_func
def data_preprocessing_op(input_path: str, output_path: str):
    """Data preprocessing component."""
    import pandas as pd
    
    # Load and preprocess data
    df = pd.read_csv(input_path)
    # Preprocessing logic here
    df.to_csv(output_path, index=False)

@create_component_from_func
def model_training_op(data_path: str, model_path: str):
    """Model training component."""
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    
    # Load data and train model
    # Training logic here
    joblib.dump(model, model_path)

@dsl.pipeline(
    name='ML Training Pipeline',
    description='End-to-end ML training pipeline'
)
def ml_training_pipeline(input_data_path: str):
    """Define the ML training pipeline."""
    
    # Data preprocessing step
    preprocess_task = data_preprocessing_op(
        input_path=input_data_path,
        output_path='/tmp/processed_data.csv'
    )
    
    # Model training step
    training_task = model_training_op(
        data_path=preprocess_task.outputs['output_path'],
        model_path='/tmp/trained_model.pkl'
    )
    
    return training_task

Model Deployment Strategies

REST API Service

from flask import Flask, request, jsonify
import joblib
import pandas as pd

app = Flask(__name__)

# Load model on startup
model = joblib.load('model.pkl')
feature_pipeline = joblib.load('feature_pipeline.pkl')

@app.route('/predict', methods=['POST'])
def predict():
    """Prediction endpoint."""
    try:
        # Get input data
        data = request.get_json()
        df = pd.DataFrame([data])
        
        # Preprocess features
        features = feature_pipeline.transform(df)
        
        # Make prediction
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0].max()
        
        return jsonify({
            'prediction': int(prediction),
            'probability': float(probability),
            'status': 'success'
        })
    
    except Exception as e:
        return jsonify({
            'error': str(e),
            'status': 'error'
        }), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model-service
  template:
    metadata:
      labels:
        app: ml-model-service
    spec:
      containers:
      - name: model-service
        image: myregistry/ml-model:latest
        ports:
        - containerPort: 8080
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
        readinessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model-service
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

Model Monitoring and Drift Detection

Performance Monitoring

import numpy as np
from scipy import stats
import mlflow

class ModelMonitor:
    def __init__(self, reference_data, model_name):
        self.reference_data = reference_data
        self.model_name = model_name
        
    def detect_data_drift(self, new_data, threshold=0.05):
        """Detect data drift using Kolmogorov-Smirnov test."""
        drift_results = {}
        
        for column in self.reference_data.columns:
            if column in new_data.columns:
                ks_stat, p_value = stats.ks_2samp(
                    self.reference_data[column], 
                    new_data[column]
                )
                drift_results[column] = {
                    'ks_statistic': ks_stat,
                    'p_value': p_value,
                    'drift_detected': p_value < threshold
                }
        
        return drift_results
    
    def log_prediction_metrics(self, predictions, actuals):
        """Log prediction metrics to MLflow."""
        with mlflow.start_run():
            accuracy = accuracy_score(actuals, predictions)
            mlflow.log_metric("production_accuracy", accuracy)
            mlflow.set_tag("model_name", self.model_name)

CI/CD Integration

GitHub Actions Workflow

name: MLOps Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.9
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
    
    - name: Run tests
      run: |
        pytest tests/
    
    - name: Data validation
      run: |
        python src/data/validate_data.py
    
    - name: Model training
      run: |
        python src/models/train_model.py
      env:
        MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
    
    - name: Model evaluation
      run: |
        python src/models/evaluate_model.py
    
    - name: Deploy model
      if: github.ref == 'refs/heads/main'
      run: |
        kubectl apply -f k8s/model-deployment.yaml

Best Practices and Lessons Learned

1. Data Quality First

  • Implement comprehensive data validation
  • Monitor data drift continuously
  • Maintain data lineage and versioning

2. Reproducibility

  • Version everything: code, data, models, and environments
  • Use container images for consistent environments
  • Document all experimental decisions

3. Gradual Rollouts

  • Use canary deployments for new models
  • Implement A/B testing frameworks
  • Monitor business metrics, not just technical metrics

4. Automation

  • Automate model retraining pipelines
  • Implement automated testing for models
  • Use infrastructure as code for reproducible deployments

Conclusion

Building a robust MLOps pipeline requires careful consideration of the entire machine learning lifecycle. The combination of MLflow for experiment tracking, Kubeflow for orchestration, and Kubernetes for deployment provides a solid foundation for production ML systems.

Key success factors:

  • Start with clear requirements and success metrics
  • Implement monitoring from day one
  • Focus on automation and reproducibility
  • Plan for model lifecycle management

The investment in proper MLOps infrastructure pays dividends in terms of model reliability, team productivity, and business impact.


Ready to implement MLOps in your organization? Contact me to discuss your machine learning operations strategy.