Apache Arrow lets you work efficiently with large, multi-file datasets.
The arrow
R package provides a dplyr
interface to Arrow Datasets,
as well as other tools for interactive exploration of Arrow data.
This vignette introduces Datasets and shows how to use dplyr
to analyze them.
It describes both what is possible to do with Arrow now
and what is on the immediate development roadmap.
The New York City taxi trip record data is widely used in big data exercises and competitions. For demonstration purposes, we have hosted a Parquet-formatted version of about 10 years of the trip data in a public S3 bucket.
The total file size is around 37 gigabytes, even in the efficient Parquet file format. That's bigger than memory on most people's computers, so we can't just read it all in and stack it into a single data frame.
In a future release, you'll be able to point your R session at S3 and query the dataset from there. For now, datasets need to be on your local file system. To download the files,
bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
dir.create("nyc-taxi")
for (year in 2009:2019) {
dir.create(file.path("nyc-taxi", year))
if (year == 2019) {
# We only have through June 2019 there
months <- 1:6
} else {
months <- 1:12
}
for (month in months) {
if (month < 10) {
month <- paste0("0", month)
}
dir.create(file.path("nyc-taxi", year, month))
download.file(
paste(bucket, year, month, "data.parquet", sep = "/"),
file.path("nyc-taxi", year, month, "data.parquet"),
mode = 'wb'
)
}
}
Note that the vignette will not execute that code chunk: if you want to run with live data, you'll have to do it yourself separately. Given the size, if you're running this locally and don't have a fast connection, feel free to grab only a year or two of data.
If you don't have the taxi data downloaded, the vignette will still run and will yield previously cached output for reference. To be explicit about which version is running, let's check whether we're running with live data:
dir.exists("nyc-taxi")
## [1] FALSE
Because dplyr
is not necessary for many Arrow workflows,
it is an optional (Suggests
) dependency. So, to work with Datasets,
we need to load both arrow
and dplyr
.
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
The first step is to create our Dataset object, pointing at the directory of data.
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
The default file format for open_dataset()
is Parquet; if we had a directory
of Arrow format files, we could include format = "arrow"
in the call.
Future versions will support more file formats, including CSV/delimited text data
and JSON.
The partitioning
argument lets us specify how the file paths provide information
about how the dataset is chunked into different files. Our files in this example
have file paths like
2009/01/data.parquet
2009/02/data.parquet
...
By providing a character vector to partitioning
, we're saying that the first
path segment gives the value for “year” and the second segment is “month”.
Every row in 2009/01/data.parquet
has a value of 2009 for “year”
and 1 for “month”, even though those columns may not actually be present in the file.
Indeed, when we look at the dataset, we see that in addition to the columns present in every file, there are also columns “year” and “month”.
ds
##
## ## FileSystemDataset with 125 Parquet files
## ## vendor_id: string
## ## pickup_at: timestamp[us]
## ## dropoff_at: timestamp[us]
## ## passenger_count: int8
## ## trip_distance: float
## ## pickup_longitude: float
## ## pickup_latitude: float
## ## rate_code_id: string
## ## store_and_fwd_flag: string
## ## dropoff_longitude: float
## ## dropoff_latitude: float
## ## payment_type: string
## ## fare_amount: float
## ## extra: float
## ## mta_tax: float
## ## tip_amount: float
## ## tolls_amount: float
## ## total_amount: float
## ## improvement_surcharge: float
## ## pickup_location_id: int32
## ## dropoff_location_id: int32
## ## congestion_surcharge: float
## ## year: int32
## ## month: int32
##
## See $metadata for additional Schema metadata
The other form of partitioning currently supported is Hive-style, in which the partition variable names are included in the path segments. If we had saved our files in paths like
year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...
we would not have had to provide the names in partitioning
:
we could have just called ds <- open_dataset("nyc-taxi")
and the partitions
would have been detected automatically.
Up to this point, we haven't loaded any data: we have walked directories to find files, we've parsed file paths to identify partitions, and we've read the headers of the Parquet files to inspect their schemas so that we can make sure they all line up.
In the current release, arrow
supports methods for selecting a window of data:
select()
, rename()
, and filter()
. Aggregation is not yet supported,
nor is deriving or projecting new columns, so before you call summarize()
or
mutate()
, you'll need to collect()
the data first,
which pulls your selected window of data into an in-memory R data frame.
While we could have made those methods collect()
the data they needed
automatically and invisibly to the end user,
we thought it best to make it explicit when you're pulling data into memory
so that you can construct your queries most efficiently
and not be surprised when some query consumes way more resources than expected.
Here's an example. Suppose I was curious about tipping behavior among the longest taxi rides. Let's find the median tip percentage for rides with fares greater than $100 in 2015, broken down by the number of passengers:
system.time(ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count) %>%
collect() %>%
summarize(
tip_pct = median(100 * tip_amount / total_amount),
n = n()
) %>%
print())
##
## ## # A tibble: 10 x 3
## ## passenger_count tip_pct n
## ## <int> <dbl> <int>
## ## 1 0 9.84 380
## ## 2 1 16.7 143087
## ## 3 2 16.6 34418
## ## 4 3 14.4 8922
## ## 5 4 11.4 4771
## ## 6 5 16.7 5806
## ## 7 6 16.7 3338
## ## 8 7 16.7 11
## ## 9 8 16.7 32
## ## 10 9 16.7 42
## ##
## ## user system elapsed
## ## 4.436 1.012 1.402
We just selected a window out of a dataset with around 2 billion rows and aggregated on it in under 2 seconds on my laptop. How does this work?
First, select()
/rename()
, filter()
, and group_by()
record their actions but don't evaluate on the data until you run collect()
.
ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count)
##
## ## FileSystemDataset (query)
## ## tip_amount: float
## ## total_amount: float
## ## passenger_count: int8
## ##
## ## * Filter: ((total_amount > 100:double) and (year == 2015:double))
## ## * Grouped by passenger_count
## ## See $.data for the source Arrow object
This returns instantly and shows the window selection you've made, without loading data from the files. Because the evaluation of these queries is deferred, you can build up a query that selects down to a small window without generating intermediate datasets that would potentially be large.
Second, all work is pushed down to the individual data files, and depending on the file format, chunks of data within the files. As a result, we can select a window of data from a much larger dataset by collecting the smaller slices from each file–we don't have to load the whole dataset in memory in order to slice from it.
Third, because of partitioning, we can ignore some files entirely.
In this example, by filtering year == 2015
, all files corresponding to other years
are immediately excluded: we don't have to load them in order to find that no
rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data we can
avoid scanning because they have no rows where total_amount > 100
.
There are a few ways you can control the Dataset creation to adapt to special use cases.
For one, you can specify a schema
argument to declare the columns and their data types.
This is useful if you have data files that have different storage schema
(for example, a column could be int32
in one and int8
in another)
and you want to ensure that the resulting Dataset has a specific type.
To be clear, it's not necessary to specify a schema, even in this example of
mixed integer types, because the Dataset constructor will reconcile differences like these.
The schema specification just lets you declare what you want the result to be.
Similarly, you can provide a Schema in the partitioning
argument of open_dataset()
in order to declare the types of the virtual columns that define the partitions.
This would be useful, in our taxi dataset example, if you wanted to keep
“month” as a string instead of an integer for some reason.
Another feature of Datasets is that they can be composed of multiple data sources.
That is, you may have a directory of partitioned Parquet files in one location,
and in another directory, files that haven't been partitioned.
In the future, when there is support for cloud storage and other file formats,
this would mean you could point to an S3 bucked of Parquet data and a directory
of CSVs on the local file system and query them together as a single dataset.
To create a multi-source dataset, provide a list of datasets to open_dataset()
instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2)
.