Machine learning is an ever-evolving field with complex models and algorithms becoming easier to implement and use due to the reduction in information asymmetry. Libraries like scikit-learn in Python require only a few lines of code (excluding pre-processing) to fit and make predictions using high-level ensemble learning techniques like Random Forest. However, the challenge lies in using machine learning efficiently and dynamically in real-world use cases where we need to use hundreds of combinations and optimizers to obtain the best results possible.
This is where the rise of ML Operations (MLOps) comes in. MLOps facilitates complex machine learning pipelines and processes and is a new field with rules and processes being composed every day. In this article, I will take a small example of a machine learning workflow and show how we can deploy it efficiently on a Kubernetes cluster using tools like Kubernetes cluster (minikube), Argo, AWS S3, and Docker. It is important to note that these installations and setups are prerequisites for this example.
A complex machine learning workflow typically involves several stages, and in order to be time-efficient, we will run all those processes in separate containers. In this small example, we will do data preprocessing in the first container and then train two different models in the second and third containers.
1. Python code
This will be divided into 3 scripts, the first script will be the data processing and the second and the third script will be model training.
Note: The data needs to be hosted somewhere so the container can access it
File 1: Pre-Processing script
The first script is a data processing script that imports the necessary libraries such as pandas and sklearn. It reads the data from a CSV file using the read_csv method from pandas. It then preprocesses the data by dropping one of the columns and separating the data into training and testing sets using the train_test_split method from sklearn. Finally, it saves the separated data into separate CSV files.
import pandas as pd
from sklearn.model_selection import train_test_split
#any data i have hosted this
df = pd.read_csv('http://localhost:8000/data/sales.csv')
#for this small example, I will just remove a column as data preproc
df.drop('size', inplace=True)
x = df.drop('sales', axis=1)
y = df['sales']x_train,
x_test, y_train, y_test = train_test_split(df, test_size=0.3)
df.to_csv('x_train.csv')
df.to_csv('x_test.csv')
df.to_csv('y_train.csv')
df.to_csv('y_test.csv')
File 2: Random Forrest regression
The second script imports the necessary libraries such as pandas, sklearn.ensemble, and sklearn.metrics. It reads the training and testing data from the CSV files created in the pre-processing script. It then instantiates a Random Forest Regressor model with 1000 decision trees and trains the model on the training data using the fit method. It uses the predict method on the test data to generate predictions, calculates the Mean Squared Error (MSE) using the mean_squared_error method from sklearn.metrics, and prints out the MSE.
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
# reading the preproc data
x_train = pd.read_csv('x_train.csv')
x_test = pd.read_csv('x_test.csv')
y_train = pd.read_csv('y_train.csv')
y_test = pd.read_csv('y_test.csv')
# Instantiate model with 1000 decision trees
rf = RandomForestRegressor(n_estimators = 1000, random_state = 42)
# Train the model on training data
rf.fit(x_train, y_train)
# Use the forest's predict method on the test data
predictions = rf.predict(x_test)
# Calculate the MSE
mse = mean_squared_error(y_test, predictions)
# Print out the mean absolute error (mse)
print('Mean Absolute Error:', mse)
File 3: Lasso Regression
The third script imports the necessary libraries such as pandas and sklearn.linear_model. It reads the training and testing data from the CSV files created in the pre-processing script. It then instantiates a LassoCV model and fits the model on the training data using the fit method. It uses the predict method on the test data to generate predictions, calculates the Mean Squared Error (MSE) using the mean_squared_error method from sklearn.metrics, and prints out the MSE.
from sklearn.linear_model import LassoCV
import pandas as pd
# reading the preproc data
x_train = pd.read_csv('x_train.csv')
x_test = pd.read_csv('x_test.csv')
y_train = pd.read_csv('y_train.csv')
y_test = pd.read_csv('y_test.csv')
# initialising and fitting the model
model = LassoCV()
model.fit(x_train, y_train)
# Use the forest's predict method on the test data
predictions = model.predict(x_test)
# Calculate MSE
mse = mean_squared_error(y_test, predictions)
# Print out the mean absolute error (mse)
print('Mean Absolute Error:', mse)
2. Creating the images
The first step is to create the code for the machine learning pipeline, which should be saved in a directory. Once the code is ready, a Docker image can be created by creating a Dockerfile with the following content:
FROM python3.6
RUN mkdir codes
COPY . codes/
RUN pip3 install -r codes/requirements.txt
This Dockerfile is based on Python 3.6 and creates a directory called codes, copies the code into it and installs the required dependencies listed in requirements.txt.
After creating the Dockerfile, it's time to create and push the Docker image. To do that, run the following commands:
docker image build -t ml_pipeline .
docker image push ml_pipline
The first command creates an image with the tag ml_pipeline using the Dockerfile in the current directory. The second command pushes the image to the Docker Hub repository with the same tag.
This process of creating and pushing Docker images is very useful in machine learning projects, as it allows you to easily share your models and pipelines with others, while ensuring consistency and reproducibility across different environments.
3. Defining the ml pipeline using Argo
To set up Argo on your Kubernetes cluster, you can refer to a quick setup guide provided in the link shared. Additionally, you need to set up an artifact repository. The author uses AWS S3 as their artefact repository, but there are other options available as well.
Argo:
Argo provides a workflow engine that allows users to implement each step in a workflow as a container on Kubernetes. The pipelines are defined and written using YAML files. Another popular alternative to Argo is Kubeflow, which uses Argo's engine and provides a Python API to define pipelines.
Let’s start building our pipeline, I will call it pipline.yml
# Our pipeline
# We will make a DAG. That will allow us to do pre proc first
# and then train models in parallel.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ml-pipeline-
spec:
entrypoint: ml-pipeline
templates:
# defining the pipeline flow
- name: ml-pipeline
dag:
tasks:
- name: preprocessing
template: preproc
- name: training-rf
dependencies: [preprocessing]
template: randomforrest
arguments:
artifacts:
- name: x_train
from: tasks.preprocessing.outputs.artifacts.x_train
- name: x_test
from: tasks.preprocessing.outputs.artifacts.x_test
- name: y_train
from: tasks.preprocessing.outputs.artifacts.y_train
- name: y_test
from: tasks.preprocessing.outputs.artifacts.y_test
- name: training-lasso
dependencies: [preprocessing]
template: lasso
arguments:
artifacts:
- name: x_train
from: tasks.preprocessing.outputs.artifacts.x_train
- name: x_test
from: tasks.preprocessing.outputs.artifacts.x_test
- name: y_train
from: tasks.preprocessing.outputs.artifacts.y_train
- name: y_test
from: tasks.preprocessing.outputs.artifacts.y_test# defining the individual steps of our pipeline
- name: preproc
container:
image: docker.io/manikmal/ml_pipline
command: [sh, -c]
args: ["python3 codes/preproc.py"]
outputs:
artifacts:
- name: x_train
path: x_train.csv
- name: x_test
path: x_test.csv
- name: y_train
path: y_train.csv
- name: y_test
path: y_test.csv- name: randomforrest
inputs:
artifacts:
- name: x_train
path: x_train.csv
- name: x_test
path: x_test.csv
- name: y_train
path: y_train.csv
- name: y_test
path: y_test.csv
container:
image: docker.io/manikmal/ml_pipline
command: [sh, -c]
args: ["python3 codes/rf.py"]- name: lasso
inputs:
artifacts:
- name: x_train
path: x_train.csv
- name: x_test
path: x_test.csv
- name: y_train
path: y_train.csv
- name: y_test
path: y_test.csv
container:
image: docker.io/manikmal/ml_pipline
command: [sh, -c]
args: ["python3 codes/lasso.py"]
In the above code, the pipeline consists of three steps: preprocessing, training a random forest model, and training a Lasso model. The pipeline is defined as a directed acyclic graph (DAG), where the preprocessing step is executed first and then the two models are trained in parallel using the preprocessed data.
Each step in the pipeline is defined as a template in the spec.templates section. The ml-pipeline template is the entry point of the pipeline, and it defines the flow of the pipeline by specifying the order of the tasks in the DAG. The preproc, randomforrest, and lasso templates are the individual steps of the pipeline.
The preproc step runs a containerized Python script (preproc.py) that performs data preprocessing and generates four output artifacts: x_train.csv, x_test.csv, y_train.csv, and y_test.csv. These artefacts are stored as output artefacts of the step and can be accessed by the subsequent steps in the pipeline.
The randomforrest and lasso steps both runs containerized Python scripts (rf.py and lasso.py, respectively) that train random forest and Lasso models using the preprocessed data. These steps use the same input artefacts as the preproc step (x_train.csv, x_test.csv, y_train.csv, and y_test.csv), which are specified in the inputs section of each step.
4. Deploying our ML pipeline on the Kubernetes cluster
Argo is a workflow management system that allows you to automate the execution of complex tasks and workflows. It provides a command-line interface (CLI) that you can use to submit workflows and monitor their progress.
To submit a workflow using the Argo CLI, you can use the argo submit command followed by the path to the YAML file that describes your workflow. In this case, the YAML file is named pipeline.yml. The --watch flag will start the workflow and then stream the logs until the workflow has been completed or failed.
argo submit pipeline.yml --watch
Once the workflow has been completed, you can use the argo logs command followed by the name of the workflow to retrieve the logs from the workflow execution. This will allow you to review the output generated by each task in the workflow and identify any errors or issues that need to be addressed.
argo logs <name of the workflow>
And you will get your results!
What we see is that we were able to successfully deploy a machine learning workflow and bring a good amount of time and resource efficiency by doing the training process of the different models parallelly. These pipelines are cable of doing many complex workflows with this being a very basic example of the capabilities. Hence, the a growing need for MLOps practises to give you the edge.
Comments