Apache Spark can be leveraged to process large volumes of distributed data that are typically impossible to process on standalone R servers. The vignette describes defining and executing Spark-only pipelines using the analysisPipelines package.
Using Spark as an engine requires the SparkR package to be installed. SparkR is distributed natively with Apache Spark and is not distributed on CRAN. The SparkR version needs to directly map to the Spark version (hence the native distribution), and care needs to be taken to ensure that this is configured properly.
To install from Github, run the following command, if you know the Spark version:
The other option is to install SparkR by running the following terminal commands if Spark has already been installed.
sys.setenv()
function.sparkPackages <- c("org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1")
sparkRSessionCreateIfNotPresent
functionSpark can connect to datasources like Hive, Kafka. Besides, it can also read parquet, json and csv files. In this example we will read a csv file.
The example below shows a few functions to perform simple aggregations.
Each user-defined function needs to be registered to the pipeline object. For non-R engines, such as Spark and Python, a suffix with the engine name is added to the function name on registration. So, functions with this suffix need to be used when pipelining to an Analysis Pipeline object. The engine is added as a suffix for better readability. A suffix is used (as opposed to a prefix) to enable easier auto-completes.
Post registration, the function can be used to construct a pipeline. A pipeline is a set of multiple functions called in a particular sequence.
# Register user-defined functions
registerFunction("meanByGroup", "Mean By Group",
engine = "spark")
# List all registered functions
getRegistry()
# Define pipeline from list of registered functions
pipelineObj %>% meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Sepal_Length", storeOutput = T) %>%
meanByGroup_spark(groupByColumn = "Species", colToSummarize = "Petal_Length", storeOutput = T) -> pipelineObj
pipelineObj %>>% getPipeline
pipelineObj %>>% visualizePipeline
The pipeline is run by calling the generateOutput()
function. A particular output in the sequence on evaluations can be accessed by calling the getOutputById
function
pipelineObj %>% generateOutput -> pipelineObj
sepalLengthBySpecies <- pipelineObj %>>% getOutputById(1)
sepalLengthBySpeciesDf <- as.data.frame(sepalLengthBySpecies)
DT::datatable(head(sepalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T))
petalLengthBySpecies <- pipelineObj %>>% getOutputById(2)
petalLengthBySpeciesDf <- as.data.frame(petalLengthBySpecies)
DT::datatable(head(petalLengthBySpeciesDf),options = list(scrollX = T, scrollY = T))
The analysisPipelines package internally uses the SparkR package to interface with Spark. SparkR masks many typical data manipulation and processing functions from base as well as packages like dplyr. Therefore, ensure you use function scoping when calling a function.