Study Notes
Pipelines are present in many areas.
In Azure Machine Learning, steps = experiment tasks are organized in pipelines. A task can be split in one or more steps.
Steps can be processed in sequence or in parallel.
Working with pipelines in Azure Machine Learning
- Connect to workspace
- Prepare data (have dataset registered)
- Create scripts for pipeline steps
- Prepare a compute environment for the pipeline steps
- Create and run a pipeline.
Once submitted, you can see it in Azure ML Studio
After training, model is registered, and you can see it under Models - Publish the pipeline.
You can see it under Pipeline - Pipeline endpoints. - Call the pipeline endpoint.
- Schedule the Pipeline
Common kinds of step in an Azure Machine Learning pipeline:
- PythonScriptStep
Runs a specified Python script. - DataTransferStep
Uses Azure Data Factory to copy data between data stores. - DatabricksStep
Runs a notebook, script, or compiled JAR on a databricks cluster. - AdlaStep
Runs a U-SQL job in Azure Data Lake Analytics. - ParallelRunStep
Runs a Python script as a distributed task on multiple compute nodes.
- prepare data (python script)
- train a model (python script)
from azureml.pipeline.steps import PythonScriptStep
# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
source_directory = 'scripts',
script_name = 'SCRIPT_FILE_TO_PREPARE_DATA.py',
compute_target = 'CLUSTER_NAME')
# Step to train a model
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'SCRIPT_FILE_NAME_TO_TRAIN_MODEL.py',
compute_target = 'CLUSTER_NAME')
To run experiment (train data), run:
from azureml.pipeline.core import Pipeline
from azureml.core import Experiment
# Define pipeline
train_pipeline= Pipeline(workspace = ws, steps = [step1,step2])
# Create experiment and run the pipeline
experiment = Experiment(workspace = ws, name = 'training-pipeline')
pipeline_run = experiment.submit(train_pipeline)
OutputFileDatasetConfig- used to pass data between pipelines steps.
- References a location in a datastore for interim storage of data
- location referenced is in a datastore (take default if not specified)
- the object is passed to the next step - Creates a data dependency between pipeline steps.
In the first step data is written to datastore
In the second step data is reed from datastore
1. Set initial data
raw_ds = Dataset.get_by_name(ws, 'raw_dataset')
Define the pipeline object (OutputFileDatasetConfig)used to pass data between stesp
data_store = ws.get_default_datastore()
prepped_data= OutputFileDatasetConfig('prepped') # reference to it
2.
# Step to run a Python script
step1 = PythonScriptStep(name = 'prepare data',
source_directory = 'scripts',
script_name = 'data_prep.py', # execute script
compute_target = 'aml-cluster',
# Script arguments include PipelineData (two parameters pass to the script)
arguments = ['--raw-ds', raw_ds.as_named_input('raw_data'), # here raw data is read from
'--out_folder',prepped_data]) # here, processed data is written to
Script that prepare data:
# data_prep.py
from azureml.core import Run
import argparse
import os
# Get the experiment run context
run = Run.get_context()
# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id') # should be: help="raw_data "
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder= args.folder
# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe() # raw dataframe read from
from azureml.core import Run
import argparse
import os
# Get the experiment run context
run = Run.get_context()
# Get arguments
parser = argparse.ArgumentParser()
parser.add_argument('--raw-ds', type=str, dest='raw_dataset_id') # should be: help="raw_data "
parser.add_argument('--out_folder', type=str, dest='folder')
args = parser.parse_args()
output_folder= args.folder
# Get input dataset as dataframe
raw_df = run.input_datasets['raw_data'].to_pandas_dataframe() # raw dataframe read from
# code to prep data (in this case, just select specific columns)
prepped_df = raw_df[['col1', 'col2', 'col3']] # do some transformation on dataframe
# Save prepped data to the PipelineData location
os.makedirs(output_folder, exist_ok=True)
output_path = os.path.join(output_folder, 'prepped_data.csv')
prepped_df.to_csv(output_path)
Now we have prepped_data.csvinto OutputFileDatasetConfigobject, in fact is name of a folder where is the csv file with processed data
# Step to run an estimator
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'train_model.py',
compute_target = 'aml-cluster',
# Pass as script argument
arguments=['--training-data', prepped_data.as_input()])
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'train_model.py',
compute_target = 'aml-cluster',
# Pass as script argument
arguments=['--training-data', prepped_data.as_input()])
This step pass to the train_model.pythe location from where processed data will be taken (read)
Publish pipeline =create a REST endpoint - pipeline can run on demand
- publish()
description='Model training pipeline',
version='1.0')
- pipeline_publish()
pipeline_experiment = ws.experiments.get('training-pipeline')
run = list(pipeline_experiment.get_runs())[0]# Get the most recent runof the pipeline
# Publish the pipeline from the run
published_pipeline = run.publish_pipeline(name='training_pipeline',
description='Model training pipeline',
version='1.0')
How to use a published pipeline:
- make an HTTP request to its REST endpoint
- pass authorization header with a token for a service principal with permission to run the pipeline
- response from a successful REST call includes the run ID
import requests
#make an HTTP request to its REST endpoint
response = requests.post(rest_endpoint,
#pass authorization header with a token for a service principal with permission to run the pipeline
headers=auth_header,
json={"ExperimentName": "run_training_pipeline"})
json={"ExperimentName": "run_training_pipeline"})
#response from a successful REST call includes the run ID
run_id = response.json()["Id"]
print(run_id)
print(run_id)
Run published pipeline withparameter.
from azureml.pipeline.core.graph import PipelineParameter
# Set parameter to be passed to pipelin
reg_param = PipelineParameter(name='reg_rate', default_value=0.01)
...
..
step2 = PythonScriptStep(name = 'train model',
source_directory = 'scripts',
script_name = 'train_model.py',
compute_target = 'aml-cluster',
# Pass parameter as script argument
arguments=['--in_folder', prepped_data,
'--reg', reg_param],
inputs=[prepped_data])
After you publish a parameterized pipeline, you can pass parameter values in the JSON payload for the REST interface:
response = requests.post(rest_endpoint,
headers=auth_header,
json={"ExperimentName": "run_training_pipeline",
"ParameterAssignments": {"reg_rate": 0.1}})
Schedule pipeline to run
- recurrence
from azureml.pipeline.core import ScheduleRecurrence, Schedule
daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(.....,
recurrence=daily)
daily = ScheduleRecurrence(frequency='Day', interval=1)
pipeline_schedule = Schedule.create(.....,
recurrence=daily)
- trigger on change
from azureml.core import Datastore
from azureml.pipeline.core import Schedule
training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(.....,
datastore=training_datastore,
path_on_datastore='data/training') # Monitor this folder for changes
from azureml.pipeline.core import Schedule
training_datastore = Datastore(workspace=ws, name='blob_data')
pipeline_schedule = Schedule.create(.....,
datastore=training_datastore,
path_on_datastore='data/training') # Monitor this folder for changes
References:
Introduction to pipelines - Training | Microsoft Learn