Apache Arrow lets you work efficiently with large, multi-file datasets. The arrow R package provides a dplyr interface to Arrow Datasets, as well as other tools for interactive exploration of Arrow data.

This vignette introduces Datasets and shows how to use dplyr to analyze them. It describes both what is possible to do with Arrow now and what is on the immediate development roadmap.

Example: NYC taxi data

The New York City taxi trip record data is widely used in big data exercises and competitions. For demonstration purposes, we have hosted a Parquet-formatted version of about 10 years of the trip data in a public S3 bucket.

The total file size is around 37 gigabytes, even in the efficient Parquet file format. That's bigger than memory on most people's computers, so we can't just read it all in and stack it into a single data frame.

In a future release, you'll be able to point your R session at S3 and query the dataset from there. For now, datasets need to be on your local file system. To download the files,

bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
dir.create("nyc-taxi")
for (year in 2009:2019) {
  dir.create(file.path("nyc-taxi", year))
  if (year == 2019) {
    # We only have through June 2019 there
    months <- 1:6
  } else {
    months <- 1:12
  }
  for (month in months) {
    if (month < 10) {
      month <- paste0("0", month)
    }
    dir.create(file.path("nyc-taxi", year, month))
    download.file(
      paste(bucket, year, month, "data.parquet", sep = "/"),
      file.path("nyc-taxi", year, month, "data.parquet"),
      mode = 'wb'
    )
  }
}

Note that the vignette will not execute that code chunk: if you want to run with live data, you'll have to do it yourself separately. Given the size, if you're running this locally and don't have a fast connection, feel free to grab only a year or two of data.

If you don't have the taxi data downloaded, the vignette will still run and will yield previously cached output for reference. To be explicit about which version is running, let's check whether we're running with live data:

dir.exists("nyc-taxi")
## [1] FALSE

Getting started

Because dplyr is not necessary for many Arrow workflows, it is an optional (Suggests) dependency. So, to work with Datasets, we need to load both arrow and dplyr.

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

The first step is to create our Dataset object, pointing at the directory of data.

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

The default file format for open_dataset() is Parquet; if we had a directory of Arrow format files, we could include format = "arrow" in the call. Future versions will support more file formats, including CSV/delimited text data and JSON.

The partitioning argument lets us specify how the file paths provide information about how the dataset is chunked into different files. Our files in this example have file paths like

2009/01/data.parquet
2009/02/data.parquet
...

By providing a character vector to partitioning, we're saying that the first path segment gives the value for “year” and the second segment is “month”. Every row in 2009/01/data.parquet has a value of 2009 for “year” and 1 for “month”, even though those columns may not actually be present in the file.

Indeed, when we look at the dataset, we see that in addition to the columns present in every file, there are also columns “year” and “month”.

ds
## 
## ## FileSystemDataset with 125 Parquet files
## ## vendor_id: string
## ## pickup_at: timestamp[us]
## ## dropoff_at: timestamp[us]
## ## passenger_count: int8
## ## trip_distance: float
## ## pickup_longitude: float
## ## pickup_latitude: float
## ## rate_code_id: string
## ## store_and_fwd_flag: string
## ## dropoff_longitude: float
## ## dropoff_latitude: float
## ## payment_type: string
## ## fare_amount: float
## ## extra: float
## ## mta_tax: float
## ## tip_amount: float
## ## tolls_amount: float
## ## total_amount: float
## ## improvement_surcharge: float
## ## pickup_location_id: int32
## ## dropoff_location_id: int32
## ## congestion_surcharge: float
## ## year: int32
## ## month: int32
## 
## See $metadata for additional Schema metadata

The other form of partitioning currently supported is Hive-style, in which the partition variable names are included in the path segments. If we had saved our files in paths like

year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...

we would not have had to provide the names in partitioning: we could have just called ds <- open_dataset("nyc-taxi") and the partitions would have been detected automatically.

Querying the dataset

Up to this point, we haven't loaded any data: we have walked directories to find files, we've parsed file paths to identify partitions, and we've read the headers of the Parquet files to inspect their schemas so that we can make sure they all line up.

In the current release, arrow supports methods for selecting a window of data: select(), rename(), and filter(). Aggregation is not yet supported, nor is deriving or projecting new columns, so before you call summarize() or mutate(), you'll need to collect() the data first, which pulls your selected window of data into an in-memory R data frame. While we could have made those methods collect() the data they needed automatically and invisibly to the end user, we thought it best to make it explicit when you're pulling data into memory so that you can construct your queries most efficiently and not be surprised when some query consumes way more resources than expected.

Here's an example. Suppose I was curious about tipping behavior among the longest taxi rides. Let's find the median tip percentage for rides with fares greater than $100 in 2015, broken down by the number of passengers:

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  group_by(passenger_count) %>%
  collect() %>%
  summarize(
    tip_pct = median(100 * tip_amount / total_amount),
    n = n()
  ) %>%
  print())
## 
## ## # A tibble: 10 x 3
## ##    passenger_count tip_pct      n
## ##              <int>   <dbl>  <int>
## ##  1               0    9.84    380
## ##  2               1   16.7  143087
## ##  3               2   16.6   34418
## ##  4               3   14.4    8922
## ##  5               4   11.4    4771
## ##  6               5   16.7    5806
## ##  7               6   16.7    3338
## ##  8               7   16.7      11
## ##  9               8   16.7      32
## ## 10               9   16.7      42
## ##
## ##    user  system elapsed
## ##   4.436   1.012   1.402

We just selected a window out of a dataset with around 2 billion rows and aggregated on it in under 2 seconds on my laptop. How does this work?

First, select()/rename(), filter(), and group_by() record their actions but don't evaluate on the data until you run collect().

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  group_by(passenger_count)
## 
## ## FileSystemDataset (query)
## ## tip_amount: float
## ## total_amount: float
## ## passenger_count: int8
## ##
## ## * Filter: ((total_amount > 100:double) and (year == 2015:double))
## ## * Grouped by passenger_count
## ## See $.data for the source Arrow object

This returns instantly and shows the window selection you've made, without loading data from the files. Because the evaluation of these queries is deferred, you can build up a query that selects down to a small window without generating intermediate datasets that would potentially be large.

Second, all work is pushed down to the individual data files, and depending on the file format, chunks of data within the files. As a result, we can select a window of data from a much larger dataset by collecting the smaller slices from each file–we don't have to load the whole dataset in memory in order to slice from it.

Third, because of partitioning, we can ignore some files entirely. In this example, by filtering year == 2015, all files corresponding to other years are immediately excluded: we don't have to load them in order to find that no rows match the filter. Relatedly, since Parquet files contain row groups with statistics on the data within, there may be entire chunks of data we can avoid scanning because they have no rows where total_amount > 100.

Going farther

There are a few ways you can control the Dataset creation to adapt to special use cases. For one, you can specify a schema argument to declare the columns and their data types. This is useful if you have data files that have different storage schema (for example, a column could be int32 in one and int8 in another) and you want to ensure that the resulting Dataset has a specific type. To be clear, it's not necessary to specify a schema, even in this example of mixed integer types, because the Dataset constructor will reconcile differences like these. The schema specification just lets you declare what you want the result to be.

Similarly, you can provide a Schema in the partitioning argument of open_dataset() in order to declare the types of the virtual columns that define the partitions. This would be useful, in our taxi dataset example, if you wanted to keep “month” as a string instead of an integer for some reason.

Another feature of Datasets is that they can be composed of multiple data sources. That is, you may have a directory of partitioned Parquet files in one location, and in another directory, files that haven't been partitioned. In the future, when there is support for cloud storage and other file formats, this would mean you could point to an S3 bucked of Parquet data and a directory of CSVs on the local file system and query them together as a single dataset. To create a multi-source dataset, provide a list of datasets to open_dataset() instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2).