Pipelines in Azure Machie Learning

Rating & reviews (0 reviews)
Study Notes

Pipelines are present in many areas.
In Azure Machine Learning, steps = experiment tasks are organized in pipelines. A task can be split in one or more steps.
Steps can be processed in sequence or in parallel.

Working with pipelines in Azure Machine Learning
  1. Connect to workspace
  2. Prepare data (have dataset registered)
  3. Create scripts for pipeline steps
  4. Prepare a compute environment for the pipeline steps
  5. Create and run a pipeline.
    Once submitted, you can see it in Azure ML Studio

    After training, model is registered, and you can see it under Models


  6. Publish the pipeline.
    You can see it under Pipeline - Pipeline endpoints.

  7. Call the pipeline endpoint.
  8. Schedule the Pipeline


Common kinds of step in an Azure Machine Learning pipeline:
  • PythonScriptStep
    Runs a specified Python script.
  • DataTransferStep
    Uses Azure Data Factory to copy data between data stores.
  • DatabricksStep
    Runs a notebook, script, or compiled JAR on a databricks cluster.
  • AdlaStep
    Runs a U-SQL job in Azure Data Lake Analytics.
  • ParallelRunStep
    Runs a Python script as a distributed task on multiple compute nodes.
Example a pipeline with two steps:
  1. prepare data (python script)
  2. train a model (python script)
from azureml.pipeline.steps import PythonScriptStep

# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
source_directory = 'scripts',
script_name = 'SCRIPT_FILE_TO_PREPARE_DATA.py',
compute_target = 'CLUSTER_NAME')

# Step to train a model
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'SCRIPT_FILE_NAME_TO_TRAIN_MODEL.py',
compute_target = 'CLUSTER_NAME')

To run experiment (train data), run:

from azureml.pipeline.core import Pipeline
from azureml.core import Experiment

# Define pipeline
train_pipeline= Pipeline(workspace = ws, steps = [step1,step2])

# Create experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)


OutputFileDatasetConfig- used to pass data between pipelines steps.
  • References a location in a datastore for interim storage of data
    - location referenced is in a datastore (take default if not specified)
    - the object is passed to the next step
  • Creates a data dependency between pipeline steps.
    In the first step data is written to datastore
    In the second step data is reed from datastore

1. Set initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')

Define the pipeline object (OutputFileDatasetConfig)used to pass data between stesp
data_store = ws.get_default_datastore()
prepped_data= OutputFileDatasetConfig('prepped') # reference to it

2.
# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
source_directory = 'scripts',
script_name = 'data_prep.py', # execute script
compute_target = 'aml-cluster',
# Script arguments include PipelineData (two parameters pass to the script)
arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'), # here raw data is read from
'--out_folder',prepped_data]) # here, processed data is written to

Script that prepare data:
# data_prep.py
from azureml.core import Run
import argparse
import os

# Get the experiment run context
run = Run.get_context()

# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id') # should be: help="raw_data "
parser.add_argument('--out_folder', type=str, dest='
folder')
args = parser.parse_args()
output_folder= args.folder

# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe() # raw dataframe read from

# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']] # do some transformation on dataframe

# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)

Now we have prepped_data.csvinto OutputFileDatasetConfigobject, in fact is name of a folder where is the csv file with processed data

# Step to run an estimator
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'train_model.py',
compute_target = 'aml-cluster',
# Pass as script argument
arguments=['--training-data', prepped_data.as_input()])

This step pass to the train_model.pythe location from where processed data will be taken (read)

Publish pipeline =create a REST endpoint - pipeline can run on demand
  • publish()
published_pipeline = pipeline.publish(name='training_pipeline',
description='Model training pipeline',
version='1.0')
  • pipeline_publish()
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]# Get the most recent runof the pipeline

# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
description='Model training pipeline',
version='1.0')

How to use a published pipeline:
  • make an HTTP request to its REST endpoint
  • pass authorization header with a token for a service principal with permission to run the pipeline
  • response from a successful REST call includes the run ID
import requests

#make an HTTP request to its REST endpoint
response = requests.post(rest_endpoint,
#pass authorization header with a token for a service principal with permission to run the pipeline
headers=auth_header,
json={"ExperimentName": "run_training_pipeline"})
#response from a successful REST call includes the run ID
run_id = response.json()["Id"]
print(run_id)

Run published pipeline withparameter.

from azureml.pipeline.core.graph import PipelineParameter

# Set parameter to be passed to pipelin
reg_param = PipelineParameter(name='reg_rate', default_value=0.01)
...
..
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'train_model.py',
compute_target = 'aml-cluster',
# Pass parameter as script argument
arguments=['--in_folder', prepped_data,
'--reg', reg_param],
inputs=[prepped_data])

After you publish a parameterized pipeline, you can pass parameter values in the JSON payload for the REST interface:
response = requests.post(rest_endpoint,
headers=auth_header,
json={"ExperimentName": "run_training_pipeline",
"ParameterAssignments": {"reg_rate": 0.1}})


Schedule pipeline to run
  • recurrence
from azureml.pipeline.core import ScheduleRecurrence, Schedule
daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(.....,
recurrence=daily)

  • trigger on change
from azureml.core import Datastore
from azureml.pipeline.core import Schedule

training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(.....,
datastore=training_datastore,
path_on_datastore='data/training') # Monitor this folder for changes

References:
Introduction to pipelines - Training | Microsoft Learn