Adventures with parquet: Storing & quering gene expression data

R
parquet
TIL
Author

Thomas Sandmann

Published

August 31, 2023

tl;dr

Today I explored storing gene expression data in parquet files, and querying them with the arrow, duckdb or sparklyr R packages.

Introduction

for (lib in c("arrow", "dplyr", "duckdb", "edgeR", "fs", "glue", "memoise",
              "rnaseqExamples", "sparklyr", "tictoc", "tidyr", "DESeq2", 
              "Homo.sapiens", "Mus.musculus")) {
  suppressPackageStartupMessages(
    library(lib, character.only = TRUE, quietly = TRUE, warn.conflicts = FALSE)
  )
}

A while back, I created the rnaseq-examples R package with serialized SummarizedExperiment objects for three published RNA-seq experiments 1. Here, I will

  1. Load the data from all three experiments into my R session,
  2. Extract the raw counts and TMM-normalized gene expression measurements (counts per million) into (tidy) R data.frames,
  3. Store the data in separate parquet files, one for each experiment and
  4. Query the parquet files with the

I was amazed by the speed of duckdb on my local system, and am looking forward to distributing large datasets across nodes of a Spark cluster in the future. Either way, parquet files are a highly portable and language-agnostic file format that I am happy to add to my toolkit.

Loading gene expression datasets

The rnaseqExamplesR package is available from github and can be installed via

remotes::install_github("tomsing1/rnaseq-examples")

Once installed, we can load the three SummarizedExperiment objects it contains into our R session.

datasets <- data(package = "rnaseqExamples")
knitr::kable(data.frame(datasets$results[, c("Item", "Title")]))
Item Title
rnai Naguib et al: Global effects of SUPT4H1 RNAi on gene expression
sarm1 Zhu et al: Effects of SARM1 deficiency on gene expression
tau Wang et al: time course of microglia from the rTg4510 mouse model
data("tau")

Next, I define the tidy() helper function to calculate sizeFactors for normalization with Robinson’s and Oshlack’s TMM method and export raw counts alongside (normalized) counts per million (CPM) in a data.frame.

  • Note that I am not exporting sample annotations in this example. This information could be added to the data.frame (with a join operation) or stored in a different format, e.g. for efficient row-wise queries.
  • The helper function is wrapped in a to memoise::memoise(), which caches the outputs in memory. That way, repeated calling of the same function avoids additional computations / joins. This functionality is not really required here, but I will try to remember it for future reference!
A helper function to coerce a SummarizedExperiment into a tibbbe
#' Coerce a DGEList or a SummarizedExperiment into a tibble
#' 
#' @param x Either a `DGEList` or a `SummarizedExperiment`
#' @return A tibble
.tidy <- function(x) {
  
  # remove non-ensembl feature identifiers (e.g. spike-ins)
  x <- x[grepl("^ENS", row.names(x)), ]
  annotation <- dplyr::case_when(
    all(grepl("^ENSG", row.names(x))) ~ "Homo.sapiens",
    all(grepl("^ENSMUS", row.names(x))) ~ "Mus.musculus"
  )
  stopifnot(!is.na(annotation))
  require(annotation, character.only = TRUE)
  
  # extract raw counts
  y <- edgeR::calcNormFactors(x)
  counts <- y$counts %>%
    as.data.frame() %>%
    tibble::rownames_to_column("feature_id") %>%
    tidyr::pivot_longer(cols = colnames(y), 
                        names_to = "sample_id", values_to = "count")
  
  # extract cpms
  cpms <- edgeR::cpm(y, normalized.lib.sizes = TRUE, log = FALSE) %>%
    as.data.frame() %>%
    tibble::rownames_to_column("feature_id") %>%
    tidyr::pivot_longer(cols = colnames(y), 
                        names_to = "sample_id", values_to = "cpm")
  
  # add gene annotations & alternative identifiers
  dplyr::inner_join(
    counts, cpms, by = c("feature_id", "sample_id")
  ) %>%
    dplyr::left_join(
      tibble::rownames_to_column(y$genes, "feature_id"),
      by = "feature_id") %>%
    dplyr::rename(ensembl = "gene_id") %>%
    dplyr::mutate(
      entrez = suppressMessages({
        mapIds(get(annotation), keys = .data$ensembl, 
               column = "ENTREZID", keytype = "ENSEMBL",
               multiVals = "first")
      })
    ) %>%
    dplyr::select(-any_of("spikein"))
}

tidy <- memoise(.tidy)

Write parquet files

Now we extract the gene expression measurements from each of our SummarizedExperiment objects and then write it to a parquet file in a temporary directory on the local file system. Alternatively, I could store the files on a cloud system, e.g. AWS S3, and access them remotely.

# create a temporary directory
out_dir <- file.path(tempdir(), "parquet")
fs::dir_create(out_dir)

for (dataset in c("tau", "rnai", "sarm1")) {
  df <- tidy(get(dataset))
  df$study <- dataset  # add a columsn with the name of the experiment
  arrow::write_parquet(
    x = df, 
    sink = file.path(out_dir, paste0(dataset, ".parquet"))
  )
}

# list the contents of the temporary directory
fs::dir_info(out_dir) %>%
  dplyr::select(path, size) %>%
  dplyr::mutate(path = basename(path))
# A tibble: 3 × 2
  path                 size
  <chr>         <fs::bytes>
1 rnai.parquet        5.22M
2 sarm1.parquet       5.29M
3 tau.parquet         8.22M

Querying

Arrow

Even though we created three separate files, the arrow R package can abstract them into a single Dataset. We simply point the open_dataset() function at the directory containing the .parquet files.

ds <- arrow::open_dataset(out_dir)
ds
FileSystemDataset with 3 Parquet files
feature_id: string
sample_id: string
count: int32
cpm: double
ensembl: string
symbol: string
gene_type: string
entrez: string
study: string

See $metadata for additional Schema metadata

We can use dbplr verbs to query this FileSystemDataset:

tic("arrow")
ds %>%
  filter(symbol %in% c("GAPDH", "Gapdh")) %>%
  group_by(symbol, study) %>%
  tally() %>%
  collect()
# A tibble: 3 × 3
# Groups:   symbol [2]
  symbol study     n
  <chr>  <chr> <int>
1 GAPDH  rnai     12
2 Gapdh  sarm1    16
3 Gapdh  tau      32
toc()
arrow: 1.717 sec elapsed

On my system, it takes ~ 1 second to retrieve the results, and about the same amount of time is needed to retrieve the full rnai dataset:

tic()
ds %>%
  filter(study == "rnai") %>%
  collect()
# A tibble: 694,236 × 9
   feature_id      sample_id count     cpm ensembl symbol gene_type entrez study
 * <chr>           <chr>     <int>   <dbl> <chr>   <chr>  <chr>     <chr>  <chr>
 1 ENSG00000163106 S320          0  0      ENSG00… HPGDS  protein_… 27306  rnai 
 2 ENSG00000163106 S168          2  0.0318 ENSG00… HPGDS  protein_… 27306  rnai 
 3 ENSG00000163106 S255          1  0.0111 ENSG00… HPGDS  protein_… 27306  rnai 
 4 ENSG00000163106 S190          0  0      ENSG00… HPGDS  protein_… 27306  rnai 
 5 ENSG00000163110 S912       1607 26.0    ENSG00… PDLIM5 protein_… 10611  rnai 
 6 ENSG00000163110 S832       2320 29.2    ENSG00… PDLIM5 protein_… 10611  rnai 
 7 ENSG00000163110 S783       1022 22.7    ENSG00… PDLIM5 protein_… 10611  rnai 
 8 ENSG00000163110 S623       1159 28.9    ENSG00… PDLIM5 protein_… 10611  rnai 
 9 ENSG00000163110 S622       1384 20.7    ENSG00… PDLIM5 protein_… 10611  rnai 
10 ENSG00000163110 S487       1023 32.6    ENSG00… PDLIM5 protein_… 10611  rnai 
# ℹ 694,226 more rows
toc()
1.09 sec elapsed

Duckdb

Alternatively, we can use duckdb to query our parquet files. The duckdb R API supports both dbplyr verbs and raw SQL queries.

First, we establish a connection to the duckdb backend:

con <- dbConnect(duckdb::duckdb())

Using dbplyr

First, we execute the same dbplyr query we used above, which translates it into SQL for us and passes it on to duckdb.

tic("duckdb")
tbl(con, sprintf("read_parquet('%s/*.parquet')", out_dir)) %>%
  filter(symbol %in% c("GAPDH", "Gapdh")) %>%
  group_by(symbol, study) %>%
  tally() %>%
  collect()
# A tibble: 3 × 3
  symbol study     n
  <chr>  <chr> <dbl>
1 GAPDH  rnai     12
2 Gapdh  tau      32
3 Gapdh  sarm1    16
toc()
duckdb: 0.065 sec elapsed

On my system, duckdb returns results more than 10x faster than arrow’s implementation (see above).

Retrieval of the full dataset for the rnai study is also completed in less than half a second:

tic()
tbl(con, glue("read_parquet('{out_dir}/*.parquet')")) %>%
  filter(study == "rnai") %>%
  collect()
# A tibble: 694,236 × 9
   feature_id      sample_id count   cpm ensembl   symbol gene_type entrez study
   <chr>           <chr>     <int> <dbl> <chr>     <chr>  <chr>     <chr>  <chr>
 1 ENSG00000000003 S912       1942  31.4 ENSG0000… TSPAN6 protein_… 7105   rnai 
 2 ENSG00000000003 S832       2942  37.1 ENSG0000… TSPAN6 protein_… 7105   rnai 
 3 ENSG00000000003 S783        905  20.1 ENSG0000… TSPAN6 protein_… 7105   rnai 
 4 ENSG00000000003 S623        960  24.0 ENSG0000… TSPAN6 protein_… 7105   rnai 
 5 ENSG00000000003 S622       2002  29.9 ENSG0000… TSPAN6 protein_… 7105   rnai 
 6 ENSG00000000003 S487        792  25.3 ENSG0000… TSPAN6 protein_… 7105   rnai 
 7 ENSG00000000003 S458       1549  20.9 ENSG0000… TSPAN6 protein_… 7105   rnai 
 8 ENSG00000000003 S420       1021  25.3 ENSG0000… TSPAN6 protein_… 7105   rnai 
 9 ENSG00000000003 S320       1432  21.5 ENSG0000… TSPAN6 protein_… 7105   rnai 
10 ENSG00000000003 S168       1773  28.2 ENSG0000… TSPAN6 protein_… 7105   rnai 
# ℹ 694,226 more rows
toc()
0.244 sec elapsed

Using SQL

Because duckdb is a SQL database, we can also query the parquet files directly with raw SQL, realizing another gain in execution speed:

tic()
dbGetQuery(
  con = con,
  glue_sql(
    "SELECT symbol, study, COUNT(*) AS n 
     FROM read_parquet({paste0(out_dir, '/*.parquet')}) 
     WHERE UPPER(symbol) = 'GAPDH' 
     GROUP BY symbol, study", 
    .con = con)
)
  symbol study  n
1  GAPDH  rnai 12
2  Gapdh sarm1 16
3  Gapdh   tau 32
toc()
0.024 sec elapsed

Similarly, reading all data for the rnai dataset into memory is faster than with arrow’s implementation as well:

tic()
dbGetQuery(
  con = con,
  glue_sql(
    "SELECT * 
     FROM read_parquet({paste0(out_dir, '/*.parquet')}) 
     WHERE study = 'rnai'", 
    .con = con)
) %>%
  head()
       feature_id sample_id count      cpm         ensembl symbol
1 ENSG00000000003      S912  1942 31.43893 ENSG00000000003 TSPAN6
2 ENSG00000000003      S832  2942 37.08823 ENSG00000000003 TSPAN6
3 ENSG00000000003      S783   905 20.10531 ENSG00000000003 TSPAN6
4 ENSG00000000003      S623   960 23.97706 ENSG00000000003 TSPAN6
5 ENSG00000000003      S622  2002 29.94743 ENSG00000000003 TSPAN6
6 ENSG00000000003      S487   792 25.26281 ENSG00000000003 TSPAN6
       gene_type entrez study
1 protein_coding   7105  rnai
2 protein_coding   7105  rnai
3 protein_coding   7105  rnai
4 protein_coding   7105  rnai
5 protein_coding   7105  rnai
6 protein_coding   7105  rnai
toc()
0.211 sec elapsed

Spark

Finally, and mainly just for future reference, the sparklyr R package provides an R interface to leverage a Spark cluster and its distributed analysis libraries.

First, we establish a connection to the Spark cluster. Here am running a local Spark cluster (e.g. a single local node) on my laptop:

sc <- spark_connect(master = "local")

Next, we import the datasets into the cluster by creating a new Spark DataFrame with the name gene_expression.

sdf <- spark_read_parquet(sc = sc, name = "gene_expression",
                          path = paste0(out_dir, '/*.parquet'))

dbplyr

The tbl_spark object returned by sparklyr::spark_read_parquet() can be queried with dbplyr verbs, e.g. to translate our now familiar queries into Spark SQL statements on the fly:

tic()
sdf %>%
  filter(symbol %in% c("GAPDH", "Gapdh")) %>%
  group_by(symbol, study) %>%
  tally() %>%
  collect()
# A tibble: 3 × 3
  symbol study     n
  <chr>  <chr> <int>
1 Gapdh  sarm1    16
2 GAPDH  rnai     12
3 Gapdh  tau      32
toc()
1.472 sec elapsed
tic()
sdf %>%
  filter(study == "rnai") %>%
  collect()
# A tibble: 694,236 × 9
   feature_id      sample_id count   cpm ensembl   symbol gene_type entrez study
   <chr>           <chr>     <int> <dbl> <chr>     <chr>  <chr>     <chr>  <chr>
 1 ENSG00000000003 S912       1942  31.4 ENSG0000… TSPAN6 protein_… 7105   rnai 
 2 ENSG00000000003 S832       2942  37.1 ENSG0000… TSPAN6 protein_… 7105   rnai 
 3 ENSG00000000003 S783        905  20.1 ENSG0000… TSPAN6 protein_… 7105   rnai 
 4 ENSG00000000003 S623        960  24.0 ENSG0000… TSPAN6 protein_… 7105   rnai 
 5 ENSG00000000003 S622       2002  29.9 ENSG0000… TSPAN6 protein_… 7105   rnai 
 6 ENSG00000000003 S487        792  25.3 ENSG0000… TSPAN6 protein_… 7105   rnai 
 7 ENSG00000000003 S458       1549  20.9 ENSG0000… TSPAN6 protein_… 7105   rnai 
 8 ENSG00000000003 S420       1021  25.3 ENSG0000… TSPAN6 protein_… 7105   rnai 
 9 ENSG00000000003 S320       1432  21.5 ENSG0000… TSPAN6 protein_… 7105   rnai 
10 ENSG00000000003 S168       1773  28.2 ENSG0000… TSPAN6 protein_… 7105   rnai 
# ℹ 694,226 more rows
toc()
3.329 sec elapsed

SQL

Alternatively, we can use SQL queries to query the cluster’s gene_expression table directly:

tic()
dbGetQuery(
  con = sc,
  glue(
    "SELECT symbol, study, COUNT(*) AS n 
     FROM gene_expression 
     WHERE UPPER(symbol) = 'GAPDH' 
     GROUP BY symbol, study")
)
  symbol study  n
1  Gapdh sarm1 16
2  GAPDH  rnai 12
3  Gapdh   tau 32
toc()
1.493 sec elapsed

My local Spark instance performs these queries more slowly than e.g. duckdb. But Spark’s real power is in deploying Machine learning models across a (potentially large) cluster, enabling parallel processing of very large datasets by distributing both data and computation across nodes.

spark_disconnect(sc)

Reproducibility

Session Information
sessioninfo::session_info("attached")
─ Session info ───────────────────────────────────────────────────────────────
 setting  value
 version  R version 4.3.1 (2023-06-16)
 os       macOS Ventura 13.5.1
 system   aarch64, darwin20
 ui       X11
 language (EN)
 collate  en_US.UTF-8
 ctype    en_US.UTF-8
 tz       America/Los_Angeles
 date     2023-09-01
 pandoc   3.1.1 @ /Applications/RStudio.app/Contents/Resources/app/quarto/bin/tools/ (via rmarkdown)

─ Packages ───────────────────────────────────────────────────────────────────
 ! package                            * version    date (UTC) lib source
 P AnnotationDbi                      * 1.62.2     2023-07-02 [?] Bioconductor
 P arrow                              * 13.0.0     2023-08-30 [?] CRAN (R 4.3.0)
 P Biobase                            * 2.60.0     2023-05-08 [?] Bioconductor
 P BiocGenerics                       * 0.46.0     2023-06-04 [?] Bioconductor
 P DBI                                * 1.1.3      2022-06-18 [?] CRAN (R 4.3.0)
 P DESeq2                             * 1.40.2     2023-07-02 [?] Bioconductor
 P dplyr                              * 1.1.2      2023-04-20 [?] CRAN (R 4.3.0)
 P duckdb                             * 0.8.1-3    2023-09-01 [?] CRAN (R 4.3.1)
 P edgeR                              * 3.42.4     2023-06-04 [?] Bioconductor
 P fs                                 * 1.6.3      2023-07-20 [?] CRAN (R 4.3.0)
 P GenomeInfoDb                       * 1.36.1     2023-07-02 [?] Bioconductor
 P GenomicFeatures                    * 1.52.2     2023-08-27 [?] Bioconductor
 P GenomicRanges                      * 1.52.0     2023-05-08 [?] Bioconductor
 P glue                               * 1.6.2      2022-02-24 [?] CRAN (R 4.3.0)
 P GO.db                              * 3.17.0     2023-08-25 [?] Bioconductor
 P Homo.sapiens                       * 1.3.1      2023-09-01 [?] Bioconductor
 P IRanges                            * 2.34.1     2023-07-02 [?] Bioconductor
 P limma                              * 3.56.2     2023-06-04 [?] Bioconductor
 P MatrixGenerics                     * 1.12.3     2023-07-30 [?] Bioconductor
 P matrixStats                        * 1.0.0      2023-06-02 [?] CRAN (R 4.3.0)
 P memoise                            * 2.0.1      2021-11-26 [?] CRAN (R 4.3.0)
 P Mus.musculus                       * 1.3.1      2023-09-01 [?] Bioconductor
 P org.Hs.eg.db                       * 3.17.0     2023-08-25 [?] Bioconductor
 P org.Mm.eg.db                       * 3.17.0     2023-08-23 [?] Bioconductor
 P OrganismDbi                        * 1.42.0     2023-05-08 [?] Bioconductor
 P rnaseqExamples                     * 0.0.0.9000 2023-09-01 [?] Github (tomsing1/rnaseq-examples@ac35304)
 P S4Vectors                          * 0.38.1     2023-05-08 [?] Bioconductor
 P sparklyr                           * 1.8.2      2023-07-01 [?] CRAN (R 4.3.0)
 P SummarizedExperiment               * 1.30.2     2023-06-11 [?] Bioconductor
 P tictoc                             * 1.2        2023-04-23 [?] CRAN (R 4.3.0)
 P tidyr                              * 1.3.0      2023-01-24 [?] CRAN (R 4.3.0)
 P TxDb.Hsapiens.UCSC.hg19.knownGene  * 3.2.2      2023-09-01 [?] Bioconductor
 P TxDb.Mmusculus.UCSC.mm10.knownGene * 3.10.0     2023-09-01 [?] Bioconductor

 [1] /Users/sandmann/repositories/blog/renv/library/R-4.3/aarch64-apple-darwin20
 [2] /Users/sandmann/Library/Caches/org.R-project.R/R/renv/sandbox/R-4.3/aarch64-apple-darwin20/ac5c2659

 P ── Loaded and on-disk path mismatch.

──────────────────────────────────────────────────────────────────────────────

Creative Commons License
This work is licensed under a Creative Commons Attribution 4.0 International License.

Footnotes

  1. If you are curious, please consult the vignettes to see examples of differential expression analyses for each dataset.↩︎