Machine Learning Operations (MLOps) has emerged as a critical discipline for organizations looking to operationalize their ML models at scale. This comprehensive guide walks through building a complete MLOps pipeline that ensures reproducible, reliable, and scalable machine learning deployments.
Traditional software development practices don't directly translate to machine learning systems due to:
Our MLOps pipeline consists of several key components:
# Install core dependencies
pip install mlflow kubeflow-pipelines tensorflow scikit-learn pandas
# Set up MLflow tracking server
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./artifacts
mlops-pipeline/
├── data/
│ ├── raw/
│ ├── processed/
│ └── validation/
├── models/
├── notebooks/
├── src/
│ ├── data/
│ ├── features/
│ ├── models/
│ └── deployment/
├── pipelines/
├── monitoring/
└── infrastructure/
import pandas as pd
import great_expectations as ge
from great_expectations.dataset import PandasDataset
def validate_data(df: pd.DataFrame) -> bool:
"""Validate incoming data against expectations."""
# Convert to Great Expectations dataset
ge_df = PandasDataset(df)
# Define expectations
ge_df.expect_column_to_exist("feature_1")
ge_df.expect_column_values_to_not_be_null("target")
ge_df.expect_column_values_to_be_between("feature_2", min_value=0, max_value=100)
# Validate
results = ge_df.validate()
return results.success
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
import joblib
class FeatureEngineering(BaseEstimator, TransformerMixin):
def __init__(self):
self.scaler = StandardScaler()
self.encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
def fit(self, X, y=None):
numeric_features = X.select_dtypes(include=[np.number]).columns
categorical_features = X.select_dtypes(include=['object']).columns
self.scaler.fit(X[numeric_features])
self.encoder.fit(X[categorical_features])
return self
def transform(self, X):
numeric_features = X.select_dtypes(include=[np.number]).columns
categorical_features = X.select_dtypes(include=['object']).columns
X_numeric = self.scaler.transform(X[numeric_features])
X_categorical = self.encoder.transform(X[categorical_features])
return np.hstack([X_numeric, X_categorical])
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
def train_model(X_train, y_train, X_val, y_val, params):
"""Train model with MLflow tracking."""
with mlflow.start_run():
# Log parameters
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Make predictions
y_pred = model.predict(X_val)
# Calculate metrics
accuracy = accuracy_score(y_val, y_pred)
precision = precision_score(y_val, y_pred, average='weighted')
recall = recall_score(y_val, y_pred, average='weighted')
# Log metrics
mlflow.log_metrics({
"accuracy": accuracy,
"precision": precision,
"recall": recall
})
# Log model
mlflow.sklearn.log_model(model, "model")
return model, mlflow.active_run().info.run_id
from hyperopt import hp, fmin, tpe, Trials
import mlflow
def optimize_hyperparameters(X_train, y_train, X_val, y_val):
"""Optimize hyperparameters using Hyperopt and MLflow."""
space = {
'n_estimators': hp.choice('n_estimators', [50, 100, 200]),
'max_depth': hp.choice('max_depth', [5, 10, 15, None]),
'min_samples_split': hp.uniform('min_samples_split', 0.01, 0.2),
'min_samples_leaf': hp.uniform('min_samples_leaf', 0.01, 0.1)
}
def objective(params):
with mlflow.start_run(nested=True):
model, _ = train_model(X_train, y_train, X_val, y_val, params)
y_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
return -accuracy # Minimize negative accuracy
trials = Trials()
best = fmin(fn=objective,
space=space,
algo=tpe.suggest,
max_evals=50,
trials=trials)
return best
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
@create_component_from_func
def data_preprocessing_op(input_path: str, output_path: str):
"""Data preprocessing component."""
import pandas as pd
# Load and preprocess data
df = pd.read_csv(input_path)
# Preprocessing logic here
df.to_csv(output_path, index=False)
@create_component_from_func
def model_training_op(data_path: str, model_path: str):
"""Model training component."""
import joblib
from sklearn.ensemble import RandomForestClassifier
# Load data and train model
# Training logic here
joblib.dump(model, model_path)
@dsl.pipeline(
name='ML Training Pipeline',
description='End-to-end ML training pipeline'
)
def ml_training_pipeline(input_data_path: str):
"""Define the ML training pipeline."""
# Data preprocessing step
preprocess_task = data_preprocessing_op(
input_path=input_data_path,
output_path='/tmp/processed_data.csv'
)
# Model training step
training_task = model_training_op(
data_path=preprocess_task.outputs['output_path'],
model_path='/tmp/trained_model.pkl'
)
return training_task
from flask import Flask, request, jsonify
import joblib
import pandas as pd
app = Flask(__name__)
# Load model on startup
model = joblib.load('model.pkl')
feature_pipeline = joblib.load('feature_pipeline.pkl')
@app.route('/predict', methods=['POST'])
def predict():
"""Prediction endpoint."""
try:
# Get input data
data = request.get_json()
df = pd.DataFrame([data])
# Preprocess features
features = feature_pipeline.transform(df)
# Make prediction
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0].max()
return jsonify({
'prediction': int(prediction),
'probability': float(probability),
'status': 'success'
})
except Exception as e:
return jsonify({
'error': str(e),
'status': 'error'
}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-service
spec:
replicas: 3
selector:
matchLabels:
app: ml-model-service
template:
metadata:
labels:
app: ml-model-service
spec:
containers:
- name: model-service
image: myregistry/ml-model:latest
ports:
- containerPort: 8080
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model-service
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
import numpy as np
from scipy import stats
import mlflow
class ModelMonitor:
def __init__(self, reference_data, model_name):
self.reference_data = reference_data
self.model_name = model_name
def detect_data_drift(self, new_data, threshold=0.05):
"""Detect data drift using Kolmogorov-Smirnov test."""
drift_results = {}
for column in self.reference_data.columns:
if column in new_data.columns:
ks_stat, p_value = stats.ks_2samp(
self.reference_data[column],
new_data[column]
)
drift_results[column] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'drift_detected': p_value < threshold
}
return drift_results
def log_prediction_metrics(self, predictions, actuals):
"""Log prediction metrics to MLflow."""
with mlflow.start_run():
accuracy = accuracy_score(actuals, predictions)
mlflow.log_metric("production_accuracy", accuracy)
mlflow.set_tag("model_name", self.model_name)
name: MLOps Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests
run: |
pytest tests/
- name: Data validation
run: |
python src/data/validate_data.py
- name: Model training
run: |
python src/models/train_model.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_TRACKING_URI }}
- name: Model evaluation
run: |
python src/models/evaluate_model.py
- name: Deploy model
if: github.ref == 'refs/heads/main'
run: |
kubectl apply -f k8s/model-deployment.yaml
Building a robust MLOps pipeline requires careful consideration of the entire machine learning lifecycle. The combination of MLflow for experiment tracking, Kubeflow for orchestration, and Kubernetes for deployment provides a solid foundation for production ML systems.
Key success factors:
The investment in proper MLOps infrastructure pays dividends in terms of model reliability, team productivity, and business impact.
Ready to implement MLOps in your organization? Contact me to discuss your machine learning operations strategy.