An introduction to meta-pipelines

Naren Srinivasan

11/19/2018

knitr::opts_chunk$set(
    eval = TRUE
  )
library(analysisPipelines)

Introduction

The meta-pipeline construct is one which allows users to export pipelines they have created for a particular use case to a general analysis flow which can be used for a different dataset and different set of parameters. A pipeline is one where the data can change, though retaining the same schema, and the same set of parameters for the functions. A meta-pipeline is one where only the analysis flow, function dependencies and so on are retained. The specific parameters for each of the functions can be set differently for a new use case.

The objective of a meta-pipeline is to define and execute reusable analysis flows. They can be used to:

Using meta-pipelines

Creating a meta-pipeline

Through this package, meta-pipelines can be created by exporting an already created pipeline to a meta-pipeline. The export retains the following items:

In the example below, we first create a pipeline, similar to the one described in the other vignettes.

## INFO [2020-06-12 13:08:09] ||  Function 'getColor' was registered successfully  ||
## INFO [2020-06-12 13:08:09] ||  Function 'getColumnName' was registered successfully  ||
## # A tibble: 16 x 7
##    functionName heading engine exceptionHandli… userDefined isDataFunction
##    <chr>        <chr>   <chr>  <chr>            <lgl>       <lgl>         
##  1 univarCatDi… "Univa… r      genericPipeline… FALSE       TRUE          
##  2 outlierPlot  "Univa… r      genericPipeline… FALSE       TRUE          
##  3 multiVarOut… "Multi… r      genericPipeline… FALSE       TRUE          
##  4 ignoreCols   "Ignor… r      genericPipeline… FALSE       TRUE          
##  5 getFeatures… ""      r      genericPipeline… FALSE       TRUE          
##  6 getTargetFo… ""      r      genericPipeline… FALSE       TRUE          
##  7 castKafkaSt… "Cast … spark… genericPipeline… FALSE       TRUE          
##  8 convertKafk… "Conve… spark… genericPipeline… FALSE       TRUE          
##  9 bivariatePl… "Bivar… r      genericPipeline… TRUE        TRUE          
## 10 filterData   ""      r      genericPipeline… TRUE        TRUE          
## 11 summarizeDa… ""      r      genericPipeline… TRUE        TRUE          
## 12 plotLine     ""      r      genericPipeline… TRUE        TRUE          
## 13 plotSummary  ""      r      genericPipeline… TRUE        TRUE          
## 14 sampleFunct… "Sampl… r      sampleException  TRUE        TRUE          
## 15 getColor     ""      r      genericPipeline… TRUE        FALSE         
## 16 getColumnNa… ""      r      genericPipeline… TRUE        FALSE         
## # … with 1 more variable: firstArgClass <chr>

We then generate an output from the pipeline, just to validate that the pipeline works properly. Of course, to define a meta-pipeline generation of output is not required.

## # A tibble: 4 x 6
##   id    operation     heading              parameters   outAsIn storeOutput
##   <chr> <chr>         <chr>                <list>       <lgl>   <lgl>      
## 1 1     getColor      ""                   <named list… FALSE   FALSE      
## 2 2     getColumnName ""                   <named list… FALSE   FALSE      
## 3 3     univarCatDis… "Univariate Distrib… <named list… FALSE   TRUE       
## 4 4     outlierPlot   "Univariate Outlier" <named list… FALSE   FALSE
## INFO [2020-06-12 13:08:09] ||  Pipeline Prep. STARTED  ||
## INFO [2020-06-12 13:08:09] ||  Pipeline Prep. COMPLETE. Time taken : 0.0223021507263184 seconds||
## INFO [2020-06-12 13:08:09] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2020-06-12 13:08:09] ||  Engine Assessment COMPLETE. Time taken : 0.0144133567810059 seconds||
## INFO [2020-06-12 13:08:09] ||  Pipeline Execution STARTED  ||
## INFO [2020-06-12 13:08:09] ||  Executing Batch Number : 1/2 containing functions 'getColor, getColumnName' ||
## INFO [2020-06-12 13:08:09] ||  Function ID '1' named 'getColor' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:09] ||  Function ID '1' named 'getColor' COMPLETED. Time taken : 0.00527262687683105 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Function ID '2' named 'getColumnName' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:09] ||  Function ID '2' named 'getColumnName' COMPLETED. Time taken : 0.00551652908325195 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Batch Number 1/2 COMPLETE. Time taken : 0.0244648456573486 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Executing Batch Number : 2/2 containing functions 'univarCatDistPlots, outlierPlot' ||
## INFO [2020-06-12 13:08:09] ||  Cleared intermediate outputs which are not required  ||
## INFO [2020-06-12 13:08:09] ||  Function ID '3' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:09] ||  Function ID '3' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0160894393920898 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Function ID '4' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:09] ||  Function ID '4' named 'outlierPlot' COMPLETED. Time taken : 0.0121641159057617 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Batch Number 2/2 COMPLETE. Time taken : 0.0475993156433105 seconds  ||
## INFO [2020-06-12 13:08:09] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2020-06-12 13:08:09] ||  Pipeline Execution COMPLETE. Time taken : 0.0988597869873047 seconds||

Exporting and reusing for a different case

Once a pipeline has been created, be it a batch or a streaming pipeline, it can be exported using the exportAsMetaPipeline method. This returns an object of class MetaAnalysisPipeline which stores the required information.

The meta-pipeline can be visualized similar to a normal pipeline object by calling the visualizePipeline method on the MetaAnalysisPipeline object.

Setting the new parameters

The next part of using the meta-pipeline is creating another pipeline with a different set of parameters. For this purpose, the user can first export the pipeline prototype which basically contains the set of functions used in the pipeline and their respective arguments.

The pipeline prototype is exported as an object of class proto from the ‘proto’ package, which is a thin skin over environments, with usability advantages such as using methods like names to get the names of objects contained in it, as well as using the ‘$’ operator to refer to specific objects. The aim of using this class is to provide an easy-to-use interface to set the new values of the arguments.

The pipeline prototype has a nested structure. The first level is a list of objects which represent the list of functions in the pipeline. A specific function can just be referred to through its name. The second level, is the list of arguments for each of those functions (again referred by the usual name).

The new values of the parameters can simply be set by using the ‘$’ operator to refer to the values. The exported pipeline prototype by default contains the values of the parameters defined in the original pipeline. Therefore, the user can simply change some of the values as required or for all of the parameters.

In the following example, we reconfigure the pipeline for use with the ‘iris’ dataset.

## proto object 
##  $ outlierPlot       :proto object  
##  $ getColumnName     :proto object  
##  $ univarCatDistPlots:proto object  
##  $ getColor          :proto object

Execution

Now once the parameters have been set, a new pipeline object (which is executable) can be created by calling the createPipelineInstance method, and passing the meta-pipeline object and the pipeline prototype. This creates a pipeline object with the usual properties.

We set the input of the pipeline object to the iris dataset and then execute to generate the output.

## INFO [2020-06-12 13:08:10] ||  Engine Assessment for pipeline STARTED  ||
## INFO [2020-06-12 13:08:10] ||  Engine Assessment COMPLETE. Time taken : 0.0156815052032471 seconds||
## INFO [2020-06-12 13:08:10] ||  Pipeline Prep. STARTED  ||
## INFO [2020-06-12 13:08:10] ||  Pipeline Prep. COMPLETE. Time taken : 0.02303147315979 seconds||
## INFO [2020-06-12 13:08:10] ||  Pipeline Execution STARTED  ||
## INFO [2020-06-12 13:08:10] ||  Executing Batch Number : 1/2 containing functions 'getColor, getColumnName' ||
## INFO [2020-06-12 13:08:10] ||  Function ID '1' named 'getColor' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:10] ||  Function ID '1' named 'getColor' COMPLETED. Time taken : 0.00779986381530762 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Function ID '2' named 'getColumnName' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:10] ||  Function ID '2' named 'getColumnName' COMPLETED. Time taken : 0.0108883380889893 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Batch Number 1/2 COMPLETE. Time taken : 0.0327384471893311 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Executing Batch Number : 2/2 containing functions 'univarCatDistPlots, outlierPlot' ||
## INFO [2020-06-12 13:08:10] ||  Cleared intermediate outputs which are not required  ||
## INFO [2020-06-12 13:08:10] ||  Function ID '3' named 'univarCatDistPlots' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:10] ||  Function ID '3' named 'univarCatDistPlots' COMPLETED. Time taken : 0.0150909423828125 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Function ID '4' named 'outlierPlot' STARTED on the 'r' engine ||
## INFO [2020-06-12 13:08:10] ||  Function ID '4' named 'outlierPlot' COMPLETED. Time taken : 0.0134298801422119 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Batch Number 2/2 COMPLETE. Time taken : 0.0506129264831543 seconds  ||
## INFO [2020-06-12 13:08:10] ||  Performing final garbage cleaning and collection of outputs  ||
## INFO [2020-06-12 13:08:10] ||  Pipeline Execution COMPLETE. Time taken : 0.112348318099976 seconds||

Saving and loading meta-pipelines

Similar to pipelines, meta-pipelines can be saved and loaded using the savePipeline method and the loadMetaPipeline function. As with pipelines, when a meta-pipeline is loaded, it overwrites the existing registry with the registry stored with the meta-pipeline.

complexMetaPipeline %>>% savePipeline("metaPipeline.RDS")

#Checking if registry is updated
getC <- function(color){
  return(color)
}
  
getCol <-function(columnName){
  return(columnName)
}

registerFunction(functionName = "getC", isDataFunction = F, firstArgClass = "character")
registerFunction(functionName = "getCol", isDataFunction = F, firstArgClass = "character")

getRegistry()
loadMetaPipeline(path = "metaPipeline.RDS") -> loadedMetaPipeline
getRegistry()

pipelineProtoLoaded <- getPipelinePrototype(loadedMetaPipeline)
str(pipelineProtoLoaded)

pipelineProtoLoaded$getColor$color<- "green"
pipelineProtoLoaded$getColumnName$columnName<- "Sepal.Length"
pipelineProtoLoaded$univarCatDistPlots$uniCol <- "Species"

loadedMetaPipeline %>>% createPipelineInstance(pipelineProtoLoaded) -> newPipelineObjLoaded

newPipelineObjLoaded %>>% setInput(input = iris) %>>%
                        generateOutput %>>% getOutputById("3")