1 Introduction

SparkR is a very valuable tool when working with big data. Unfortunately it’s documentation is rather sparse. Here I have collected a number of solutions and explanations I have been struggling myself with and hope others may find useful.

The source of this document is on Github (see file sparkr_notes.rmd), all comments and issues are welcome there.

1.1 Preliminaries

In this text I assume you are familiar with magrittr pipes %>%, %<>% and %T>%. Pipes are an excellent complement to SparkR (and to many other R tasks). A good source to read about pipes is Garret Grolemund and Hadley Wickham’s R for data science section Pipes.

1.2 SparkR and dplyr

SparkR takes a similar approach as dplyr for transforming data, so I strongly recommend you to familiarize yourself with dplyr before you start with spark. An excellent source for this is Garret Grolemund and Hadley Wickham’s R for data science, section Data Transformations. The similarity if further stressed by a number of functions (“verbs” in Grolemund and Wickham’s parlance) having exactly the same name and similar syntax. In case of SparkR, there are often two versions, one with dplyr-similar syntax, and one with it’s own distinct syntax.

The fact that the syntax is the same can cause issues, however. For instance, selection of certain variables can be done with select:

select(data.frame, col, ...) # dplyr
select(SparkDataFrame, col, ...) # SparkR

There are two potential issues here:

  1. The syntax is not exactly the same: where dplyr expects a list of unquoted variable names, SparkR expects a list of quoted names (there are other options too, see Selecting variables).
  2. Apparently, the generic select, initiated by dplyr, is unable to invoke the SparkDataFrame method. Instead you see an obscure error like Error in UseMethod(“select_”) : no applicable method for ‘select_’ applied to an object of class “SparkDataFrame”. If you want to use both dplyr and SparkR, load dplyr before you load SparkR.

Besides select, the common functions include mutate and rename. There are also a plethora of other packages that export identically named functions, e.g. hour in lubridate overrides the same function in SparkR.

2 General concepts

2.1 RDD: the basic dataset

The central data structure in spark is Resilient Distributed Dataset (RDD). Resilient means that if something breaks in the workflow, for instance a cluster node dies, the data can be recovered from an earlier stage and the RDD lineage information. So RDD is not just a dataset: a number in RDD is not just a number, but a number plus information where this number comes from.

Dataframes in spark are made of RDD-s and hence inherit the RDD properties, in particular the lineage.

2.1.1 Checkpointing

As one operates repeatedly on the same data, the RDD lineage may grow without limits. This happens, for instance, if you add data to an RDD in a loop, and each time also filter out certain observations. This manifests with RDD operations getting slower and slower, one may also get java.lang.StackOverflowError-s. A very valuable resource for seeing and debugging the RDD lineages is the web monitoring port (default value 4040).

A way to break the lineage is to use checkpointing. Checkpointing saves the current dataframe as “just data”, without any historical information. If should be saved on a fault-tolerant file system, such as HDFS, to ensure the whole process is fault-tolerant. All the following execution plans now take the resulting dataframe as a blank sheet (well, blank in terms of lineage, not data content!). Checkpointing can be done like this:

setCheckpointDir("/tmp")
                           # note: have to set checkpoint dir
df2 <- checkpoint(df1)

Now df2 is a “lineage-free” dataframe.

However, checkpointing is not without it’s downsides. It saves data on the disk, and this may be quite slow. Second, it may take quite a bit storage as you are essentially writing all your data on disk in uncompressed format.

2.2 Jobs, stages, tasks

Spark splits its work into jobs, stages, and tasks.

  • A job is what broadly corresponds to a spark function, e.g. read.json or select("id", "name"). But as spark uses lazy evaluation and optimizes the queries, the jobs may not exactly correspond to the original code.
  • Jobs are further divided into stages that contain the subtasks. In case of simple jobs, e.g. when reading a file, a single stage may correspond to a single job.
  • Finally, stages are split into tasks, these are normally just similar data processing tasks that can run in parallel on different executors, or may be run sequentially on the same executor.

Spark displays it’s progress on console when: a bar like

[Stage 0:====================================================>    (55 + 1) / 60]

indicates that 55 tasks out of total 60 in stage 0 are done.

The best way to understand what are the jobs and stages is to check the DAG visualization in the Web monitoring window.

3 Installing and Invoking Spark

3.1 Installation

3.1.1 Installing Spark

As spark is running on Java VM, you need java. Spark 2.2–2.4 only work on java 1.8. If you already have a correct version of java installed, the rest on linux and mac is just a straight sailing:

  1. download the latest version
  2. decompress it into a suitable location (e.g. $HOME/local)
  3. If you intend to use spark from outside of R, you should also set
SPARK_HOME=$HOME/local/spark-x.x.x-bin-hadoop

or whatever is your exact installation directory. When only relying on R, the location of spark home can be set when initializing spark session.

Windows installation is more involved: you have to set a few paths and permissions.

3.1.2 Installing correct version of java

If you have never previously installed java, you can just add the most recent Java 1.8.

If you have an incompatible version, you can add another java fairly easily. On debian-based linux (such as ubuntu), you can select the default java by

sudo update-alternatives --config javac

this updates a few symlinks to ensure the correct java version is taken as default.

Another way is just to specify the JAVA_HOME environment variable. This approach also works on Mac, and it allows you to pick a specific version of java just for Spark while leaving the system defaults untouched. When running Spark from inside R, the easiest approach is to set the environment variable by Sys.setenv with something like (on a mac)

Sys.setenv(JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_52.jdk/Contents/Home")

where the path must correspond the correct Java 1.8 path on your computer.

Correct java must be specified before you initiate the spark session.

3.2 Starting Spark

3.2.1 Just get it to run

Here we only describe how to start spark from inside R. You have to a) load the library from a non-standard location, and b) initiate spark session. This can be done along the following lines:

SPARK_HOME <- file.path(Sys.getenv("HOME"), "local", "spark")
                           # this is the directory where your spark is located
library("SparkR", lib.loc=file.path(SPARK_HOME, "R", "lib"))
                           # note: SparkR with capital S
ss <- sparkR.session(master = "local[1]",
                           # note: sparkR with lower case s
                     sparkHome = SPARK_HOME,
                     appName = "spark demo")

the SPARK_HOME points to the folder where spark is installed. If you downloaded and decompressed it into ~/Downloads, the correct path may look like SPARK_HOME <- "~/Downloads/spark-2.4.0-hadoop2.7/R/lib". lib.loc argument points to “R/lib” folder inside it. Note: including sparkHome argument is only needed if you did not specify SPARK_HOME environment variable as indicated above under installation.

If you have to choose a specific java version (see above), do it before you start spark session. You may as well do it before loading the library.

3.2.2 Options

sparkR.session accepts a number of options:

  • master is the location the spark master (driver). "local[n]" mean spark will be run locally, on your computer, utilizing n cpus. You can also use "local[*]" if you want to utilize all logical cpus (one process per cpu thread).
  • sparkConfig is a list of configuration options. The more interesting ones are:
    • spark.driver.memory: how much memory to allocate to the spark driver (or the complete spark process if run locally). Use character strings, such as "500m" or "4g". If this is too small, you get java out-of-memory errors related to java heap space.
    • spark.sql.parquet.compression.codec: by default, spark compresses parquet files by snappy codec which creates rather large temporary files. You can specify a different codec, such as "gzip", during the session initialization. Later calls to sparkR.conf do not work. It’s value is a string.

Here is an example of initiating the spark session while also setting the configuration options:

ss <- sparkR.session(master = "local[2]",
                     appName = "sparkNotes",
                     sparkConfig=list(spark.sql.parquet.compression.codec="gzip",
                                      spark.driver.memory="8g",
                                      log4j.logger.org.apache.parquet="INFO"
                                      )
                     )

3.2.3 SparkR and knitr

Spark can easily be used with knitr and the corresponding chunks are basically just R code. However, as spark objects are not R objects–they are pointers to the corresponding java objects–spark objects cannot be cached. You have to re-run your objects again (or build your own cache).

3.3 Configuration

3.3.1 Getting Run-Time Configuration

The basic run-time configuration can be returned by sparkR.conf(). Without any arguments it returns all set options:

sparkR.conf()
## $log4j.logger.org.apache.parquet
## [1] "INFO"
## 
## $spark.app.id
## [1] "local-1671222752549"
## 
## $spark.app.name
## [1] "sparkr notes"
## 
## $spark.app.startTime
## [1] "1671222751705"
## 
## $spark.app.submitTime
## [1] "1671222751168"
## 
## $spark.driver.extraJavaOptions
## [1] "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
## 
## $spark.driver.host
## [1] "192.168.8.147"
## 
## $spark.driver.memory
## [1] "8g"
## 
## $spark.driver.port
## [1] "41333"
## 
## $spark.executor.extraJavaOptions
## [1] "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
## 
## $spark.executor.id
## [1] "driver"
## 
## $spark.executorEnv.LD_LIBRARY_PATH
## [1] "$LD_LIBRARY_PATH:/usr/lib/R/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/default-java/lib/server:/home/siim/.local/lib:"
## 
## $spark.home
## [1] "/home/siim/local/spark-3.3.1-bin-hadoop3"
## 
## $spark.master
## [1] "local[2]"
## 
## $spark.r.sql.derby.temp.dir
## [1] "/tmp/RtmpwwrcA3"
## 
## $spark.sql.catalogImplementation
## [1] "hive"
## 
## $spark.sql.parquet.compression.codec
## [1] "gzip"
## 
## $spark.sql.warehouse.dir
## [1] "file:/home/siim/proge/R-lookup-tables/spark-warehouse"
## 
## $spark.submit.deployMode
## [1] "client"
## 
## $spark.submit.pyFiles
## [1] ""
## 
## $spark.ui.showConsoleProgress
## [1] "true"

Note that there is no obvious entry for parallelism. When invoked locally, it is visible as the entry of spark.master, here “local[2]”, but no dedicated number is provided (2.3, 2.4). Neither can you see an entry for spark version, although the spark.home directory name contains the version number in this installation.

3.3.2 Temporary Directories

Spark stores quite a bit of temporary data on disk so you should ensure you have plenty of disk space. The location of temporary folders can be set in the environment variable SPARK_LOCAL_DIRS as a comma-separated list of directories:

SPARK_LOCAL_DIRS="$HOME/tmp,/tmp"

This creates temporary folders in two locations, one in $HOME/tmp and another in /tmp. The folders appear in pairs, names like blockmgr-6e392c2c... and spark-f0e4221e....

Spark splits data to be saved roughly equally onto both of these devices. This means, in particular, that when one of the devices is full, spark fails even if there is still plenty of space on the other (2.1.2).

3.4 Interacting with spark

One can invoke sparkR in an ordinary R session like any other interactive command. This is great for testing/debugging and trying to understand spark. Note that the commands on spark are much slower compared to the base R when using small dataframes.

For anything resembling a more serious work I’d recommend to run SparkR in batch mode, exactly as any other R script.

3.4.1 Web monitoring ports

Spark also sets up a small web server that allows you to monitor some of the spark’s internal workings.

Port 4040 shows the basics of the working: jobs, stages, and tasks. Here is an example of how what you can see in web monitoring.

Web monitoring example

8080 is for master

if you start spark locally (–master local[n]), then the 8080 and 8081 ports are not there, only 4040.

A very useful information you get from the web monitor is DAG visualization. This shows how the pieces of data are linked, which stages are done, and what kind of lookups spark has to do for the processing. You can see DAG for jobs when clicking on a particular job and selecting DAG Visualization, or a more detailed DAG if you select stages and check the stage DAG visualization instead.

If DAG gets too complicated then spark gets slow and it is advisable to break the lineage using checkpointing.

4 Data input and output

4.1 From R to spark and back

An R dataframe can be transformed to a spark dataframe using createDataFrame function, the reverse is done by collect. For instance:

dfa <- data.frame(a=1:2) %>%
   createDataFrame()
dfa
## SparkDataFrame[a:int]
dfa %>% collect()
##   a
## 1 1
## 2 2

However, note you cannot transform 0-row R dataframes into spark in this way

dfa <- data.frame(a=integer()) %>%
   createDataFrame()

will give an unhelpful error regarding to subscript out of bounds.

4.2 How to read csv and json files

4.2.1 CSV files

CSV files are one of the most common and versatile forms of data. SparkR reads these pretty well but unfortunately, the documentation is not good enough to get started. The basic syntax is:

loadDF(fName,
       source="csv",
       header="true",
       sep=",")

The most important arguments are:

  • fName: the file name. The unix home directory marker ~ does not seem to work (it is taken as a literal character in the file name), unlike in ordinary R. However, there are a number of good news:
    • compressed files, at least .bz2, are decompressed transparently (2.2.0).
    • fName can be a shell pattern corresponding to multiple files, such as dir/*.csv. In that case all these files are read and stacked into the resulting data frame.
    • fName can also be the folder’s name (both with and without the trailing slash) in which case all files in that folder are read. Note: in this case loadDF loads all the files in that folder, including non-csv ones.
  • source: should be “csv” here
  • header: whether the file contains the first line as column names. Defaults to “false” which reads the eventual header names as the first line, and names columns to _c0, _c1 etc. (2.2.0). Note: you have to specify java-ish "true" and "false" (in quotes) here, not R-ish TRUE and FALSE.
  • sep: field separator, default “,”.
  • there are more arguments.

loadDF loads everything as string type, you may want to convert the columns afterwards (this can probably also be achieved by schemas).

There is an alternative function, read.df, that differs from loadDF by the default arguments.

4.2.2 JSON files

JSON files can read with

load.json(fName)
  • fName can be a vector of character file names.

4.3 Parquet files

Reading parquet files goes with

read.parquet(file)

which returns a spark data frame. However, the handling of file argument is non-trivial.

  • file: file name, actually the name of the folder where parquet holds it’s files
    • file accepts certain shell patterns: *, ?, and brace expansion. However, only a single set of brace expansion is allowed, and only with commas, not .. ranges (linux, 2.3.1). Brace expansion can be combined with patters, so a name like /data/{home,work}/file* is valid.
    • unlike write.parquet, file for reading also does tilde expansion. (linux, 3.0.1)
    • If you want to read all parquet file (or rather folders) in a directory, you have to specify file as path/to/directory/*. If you leave out the trailing wildcard *, you get an error Unable to infer schema for Parquet (2.3.1).

Writing parquet files is similar:

write.parquet(df, path, mode)
  • df is your (spark) data frame
  • path is the path/file name
    • unlike read.parquet, path does not do tilde expansion! So, e.g. storing data frame into a file "~/foo" stores file foo into a folder named ~ in the current directory. However, read.parquet attempts to read file foo from your home folder. (linux, 3.0.1)
  • mode: use "overwrite" to overwrite existing file.

5 Data selection and subsetting

5.1 Selecting variables

Spark supports several ways to select (and order) columns from a dataframe. The main methods are select function and R-style [] indexing:

dfa <- data.frame(family=c("Hu", "Xi"),
                  name=c("Jintao", "Jinping"),
                  yob=c(1942, 1953)) %>%
   createDataFrame()
dfa[,c("yob", "family")] %>%
   collect()
##    yob family
## 1 1942     Hu
## 2 1953     Xi
dfa %>% select("yob", "family") %>%
   collect()
##    yob family
## 1 1942     Hu
## 2 1953     Xi

select is rather permissive in it’s arguments, in addition to the above, one can also use select(c("yob", "family")) and select(list("yob", "family")).

5.2 Dropping variables

Dropping variables can be achieved with drop. Drop can handle a vector of variable names:

vars <- c("name", "yob")
dfa %>%
   drop(vars) %>%
   collect()
##   family
## 1     Hu
## 2     Xi

However, it cannot handle multiple variable names as separate arguments–drop("name", "yob") will not work.

5.3 Filtering and sampling observations

The way to filter observations based on a logical expression is filter. It expects a column expression.

Example:

dfa %>% filter(column("yob") > 1950) %>%
   collect()
##   family    name  yob
## 1     Xi Jinping 1953

Filter will not work with multiple condidions, you have to use logical operators.

Filter wants your values to be of a single class, including a single value for S3 classes. This sometimes creates warnings, e.g. with dates that have class of c("POSIXct", "POSIXt"):

df <- data.frame(date = ISOdatetime(2018,10,17,14,34,12)) %>%
   createDataFrame()
df %>%
   filter(column("date") == ISOdatetime(2018,10,17,14,34,12)) %>%
   collect()
## Warning in if (class(e2) == "Column") {: the condition has length > 1 and only the first element will be used
##                  date
## 1 2018-10-17 14:34:12

It still works (Spark 3.3.1), but the warning is unpleasant. It may not work in Spark 3.2. Note that you cannot use suppressWarnings with piping.

As a workaround, you can use as.POSIXct when writing the conditions:

df <- data.frame(x = c(6,9,12,15,18),
                 date = as.POSIXct(c("2022-12-16 6:30",
                                     "2022-12-17 9:31",
                                     "2022-12-18 12:32",
                                     "2022-12-19 15:33",
                                     "2022-12-20 18:34")),
                 day = c("Fri", "Sat", "Sun", "Mon", "Tue")) %>%
   createDataFrame()
df %>%
   collect()
##    x                date day
## 1  6 2022-12-16 06:30:00 Fri
## 2  9 2022-12-17 09:31:00 Sat
## 3 12 2022-12-18 12:32:00 Sun
## 4 15 2022-12-19 15:33:00 Mon
## 5 18 2022-12-20 18:34:00 Tue
df %>%
   filter(hour(column("date")) >= 9 &
          hour(column("date")) < 17 &  # working hours only
          dayofweek(column("date")) < 5  # all days, except Fri, Sat
          ) %>%
   collect()
##    x                date day
## 1 12 2022-12-18 12:32:00 Sun
## 2 15 2022-12-19 15:33:00 Mon

One can sample a certain percentage of observations with

sample(df, withReplacement, fraction)

where fraction is the percentage (between 0 and 1) of the original data to be sampled. The fraction is claimed to be approximate, and there seems to be no way to sample an exact number of observations.

6 Working with data: the map part

6.1 Accessing columns

One can access single columns with the $-operator and the column function. The $-operator works in a fairly similar manner than for ordinary data frames. Note that in case of piping where your intermediate dataframe does not have a name, you may access the columns with .$ way like .$foo.

Function column is an alternative, it takes column name as a character: column("foo").

6.2 Creating new columns: column expressions, withColumn and mutate

There are three functions for computing on columns: withColumn, mutate and transform. All of these can either create new, or overwrite existing columns.

The arguments for withColumn are:

withColumn(dataframe, columnName, expression)

where columnName is a character column name (not a Column object), and expression is a column expression: something you can create from existing columns and constants using functions and arithmetic operators.
It may include mathematical and string operations and other column-oriented functions. Take the example dataframe

dfa %>%
   collect()
##   family    name  yob
## 1     Hu  Jintao 1942
## 2     Xi Jinping 1953
dfa %>%
   withColumn("age2018", lit(2018) - column("yob")) %>%
   collect()
##   family    name  yob age2018
## 1     Hu  Jintao 1942      76
## 2     Xi Jinping 1953      65

The column expression, lit(2018) - column("yob"), computes the age in 2018. As column("yob") is not a number (but a “Column” object), one cannot simply do 2018 - column("yob"). Arithmetic with numbers and columns is limited (there is no “-” function with signature “numeric”, “Column”). lit(2018) creates a new constant column, and now we perform arithmetic of two columns instead of a numeric and a column. However, arithmetic operators for columns can handle “numeric” further down in the signature:

dfa %>%
   withColumn("age2018", (column("yob") - 2018)*(-1)) %>%
   collect()
##   family    name  yob age2018
## 1     Hu  Jintao 1942      76
## 2     Xi Jinping 1953      65

Now R selects “-” with signature “Column” and “numeric”, and this works well.

mutate offers largely overlapping functionality, but it can take more than one computation at a time, and it’s expressions are in the form name = expression. We can achieve the above, and more, in the following example:

dfa %>%
   mutate(age2018 = lit(2018) - column("yob"),
          age2000 = lit(2000) - column("yob")
         ) %>%
   collect()
##   family    name  yob age2018 age2000
## 1     Hu  Jintao 1942      76      58
## 2     Xi Jinping 1953      65      47

Note that name must be unquoted. We need two withColumn commands to achieve equivalent results.

Mutate has a few quirks:

  • short variable names, such as x, sometimes do not work and lead to obscure errors.
  • if you use mutate to create more than one variable, you cannot use the previously created variables in the next expressions. You need a new mutate call.
  • mutate seems to be less able to do computations inside column expressions, see examples in date_format.

Finally, transform seems to be pretty much an alias for mutate.

6.2.1 Constant columns

Constant columns can be added with the lit function:

foo <- data.frame(a=1:2) %>%
   createDataFrame()
foo %>%
   withColumn("b", lit(77)) %>%
   collect()
##   a  b
## 1 1 77
## 2 2 77

will add a constant column b = 77 to the dataframe, withColumn will most likely figure out the correct type.

You can add missing values in the same fashion. However, if you just do withColumn("c", lit(NA)), you get a column of type null which may cause trouble downstream (cannot save it in a parquet file, for instance). You should cast it into the correct type, for instance to “integer”:

foo %>%
   withColumn("b", cast(lit(NA), "integer")) %T>%
   printSchema() %>%
   collect()
## root
##  |-- a: integer (nullable = true)
##  |-- b: integer (nullable = true)
##   a  b
## 1 1 NA
## 2 2 NA

Indeed, as the schema indicates, b is now an integer.

6.3 Renaming columns

There are two built-in functions for renaming. rename expects a list of arguments in the form of newName = existingColumn (after the data frame argument). Note: this is quoted or unquoted name = column, the right hand side must not be just a column name. This is perhaps the easiest way to rename variables:

data <- createDataFrame(data.frame(a=1:2, b=c("Huang", "Goyal")))
data %>%
   rename(id = column("a"), "name" = column("b")) %>%
   collect()
##   id  name
## 1  1 Huang
## 2  2 Goyal

withColumnRenamed expects two arguments (after the data frame argument): old column name and new column name. Note: these are column names, not columns, and the order is the other way around:

data <- createDataFrame(data.frame(a=1:2, b=c("Huang", "Goyal")))
data %>%
   withColumnRenamed("a", "id") %>%
   withColumnRenamed("b", "name") %>%
   collect()
##   id  name
## 1  1 Huang
## 2  2 Goyal

However, certain short column names do not work (2.2.1) and exit with an uninformative error message:

data %>%
   rename("x" = column("a")) %>%
   collect()
## Error in h(simpleError(msg, call)): error in evaluating the argument 'x' in selecting a method for function 'collect': unable to find an inherited method for function 'rename' for signature '"Column"'

Name “xx” works, however:

data %>%
   rename("xx" = column("a")) %>%
   collect()
##   xx     b
## 1  1 Huang
## 2  2 Goyal

6.4 Converting columns

6.4.1 Casting with cast

cast(x, dataType) is the general way of converting column x to a new data type. dataType must be one of the spark datatypes, not R data type. In particular

R data type spark data type
character string
numeric double

6.4.2 Numbers to strings

A tailor-made way to make strings out of numbers is format_string(format, column). It appears to use standard C formatting specifications:

data %>%
   withColumn("c", format_string("%10d", column("a"))) %>%
   collect()
##   a     b          c
## 1 1 Huang          1
## 2 2 Goyal          2

Note the space before the numbers in string format.

When feeding a string column for %d format, spark fails with IllegalFormatConversionException: d != org.apache.spark.unsafe.types.UTF8String.

7 Working with data: the reduce part

7.1 aggregating with agg

agg is the general aggregation function. It takes the data frame and aggregation arguments. For instance, let’s count the number of non-missing entries in a data frame:

df <- data.frame(a=c(4, 1,NA)) %>%
   createDataFrame()
collect(df)
##    a
## 1  4
## 2  1
## 3 NA
df %>%
   withColumn("b", cast(isNotNull(column("a")), "integer")) %>%
   agg(n = sum(column("b")),
       mean = avg(column("a"))) %>%
   collect()
##   n mean
## 1 2  2.5

This will result in a dataframe that contain one column, n, value of which is the desired value. Here aggregation is determined by the sum function, an agg-s helper function, that takes a numeric column (this is why we have to cast the logical column to an integer).

Agg is very much similar to dplyr’s summarize function: it aggregates data, but one must supply the aggregation rule.

There is a long list of agg-s aggregation functions (x is a column):

  • avg(x): average value. Missings are ignored.
  • count(x): count number of objects (alias n). Returns an integer.
  • first(x, na.rm=FALSE): return the first element in the group
  • n(x): count number of objects (alias count)
  • sum(x): add the values

8 Messing data around: merge, order and pivot/reshape

8.1 Merging

8.1.1 rbind: concatenating by rows

Adding nother dataframe underneath a spark dataframe is done with rbind, exactly as in case of base R:

names1 <- data.frame(name="sune", age=17) %>%
   createDataFrame()
names2 <- data.frame(name="jestin", age=33) %>%
   createDataFrame()
rbind(names1, names2) %>%
   collect()
##     name age
## 1   sune  17
## 2 jestin  33

Both data frames must have the same columns in exactly the same order. If the order does not match, you get an error Names of input data frames are different, even if the names are the same and the problem lies just in the order.

8.1.2 Merging and joining

SparkR features two similar functions: merge and join. They do not behave in quite the same manner.

Merge syntax is rather similar to that of base::merge:

merge(x, y, by, suffixes)
  • x, y are data frame to be merged.
  • by is a vector of key columns, by default columns with similar name in both data frames.
    • by columns can be of different type, for instance integer and double columns still match.
  • suffixes is a character vector of 2 specifying different suffixes for x and y to make unique names.
    • Even if you want to retain just one version of the variable with it’s original name, you cannot specify something like suffices=c("", "_y"). You get an error. (2.2.0)

8.1.2.1 Merge renames variables (2.2.0)

If merging two data frames on a column of similar name, merge creates new variables like foo_x and foo_y according to the respective origin data frame. For instance:

foo <- createDataFrame(data.frame(a=1:2, b=11:12, c=c("a", "b")))
bar <- createDataFrame(data.frame(a=1:2, b=21:22, d=c("x", "y")))
merge(foo, bar, by="a") %>% collect()
##   a_x  b c a_y  b d
## 1   1 11 a   1 21 x
## 2   2 12 b   2 22 y

If, instead of by, you specify both by.x and by.y, then it also renames the b-s:

merge(foo, bar, by.x="a", by.y="a") %>% collect()
##   a_x b_x c a_y b_y d
## 1   1  11 a   1  21 x
## 2   2  12 b   2  22 y

Join, in turn, does not rename but still keeps both version of the join column in the data:

join(foo, bar, foo$a == bar$a) %>% collect()
##   a  b c a  b d
## 1 1 11 a 1 21 x
## 2 2 12 b 2 22 y

8.1.2.2 Merging and “trivial” join conditions

As stressed above, RDD is not just data but also information about how the data was created. merge can use some of this lineage information. In particular, if the key column is created through lit() constant, it refuses to merge. See the following examples:

foo <- data.frame(a=1:2, b=3:4) %>% createDataFrame()
foo %>% collect()
##   a b
## 1 1 3
## 2 2 4
bar <- data.frame(c=5:6, a=c(0L,0L)) %>% createDataFrame()
bar %>% collect()
##   c a
## 1 5 0
## 2 6 0
merge(foo, bar, all.x=TRUE) %>% collect()
##   a_x b  c a_y
## 1   1 3 NA  NA
## 2   2 4 NA  NA

Here we create two dataframes, foo and bar and merge these on column a. As there are no common key values, the variables inherited from bar are all missing. This is to be expected.

Now let’s modify this example with literal constants:

baz <- data.frame(c=5:6) %>% createDataFrame() %>% withColumn("a", lit(0))
baz %>% collect()
##   c a
## 1 5 0
## 2 6 0

So the baz dataframe looks the same as bar above. However, now merge knows that the keys are just constants:

merge(foo, baz, all.x=TRUE) %>% collect()

fails with Join condition is missing or trivial error (2.3.1). Remember: dataframe is not just data but also the lineage information!

8.1.3 Merging and lazy evaluation

Spark’s lazy evaluation makes debugging hard, the code may fail before it even reaches the problematic line.

For instance, when ordering before merging where the merge introduces ambiguous columns you may see an error like:

org.apache.spark.sql.AnalysisException: Reference 'b' is ambiguous, could be: b#3, b#10.;

For instance:

foo <- createDataFrame(data.frame(a=1:2, b=11:12))
bar <- createDataFrame(data.frame(a=1:2, b=21:22))
foo %>%
   groupBy("a", "b") %>%
   count() %>%
   merge(bar, by="a") %>%
   rename(a = column("a_x")) %>%
   collect()

fails with error Caused by: org.apache.spark.sql.AnalysisException: Reference 'b' is ambiguous, could be: b#3, b#10.;. The problem is multiple instance of b introduced through merge, but failure seems to occur at groupBy("a", "b"). Indeed, this is the only place in the code where we use the variable b.

However, a slightly simpler plan—just removing the rename part that seems unrelated to the problem—still works and demonstrates what happens at merge:

foo <- createDataFrame(data.frame(a=1:2, b=11:12))
bar <- createDataFrame(data.frame(a=1:2, b=21:22))
foo %>%
   groupBy("a", "b") %>%
   count() %>%
   merge(bar, by="a") %>%
   collect()
##   a_x  b count a_y  b
## 1   1 11     1   1 21
## 2   2 12     1   2 22

By removing rename(a = column("a_x")) at the end of the previous example the code works, I guess this is related to how well can the code translated to spark execution plans. See alse arrange by a renamed column where something similar happens. (Spark 2.2.1, 2.4.4)

8.2 Ordering by Group

Many common tasks require ordering by group. SparkR has multi-column ordering function (arrange) and multi-column grouping (groupBy). Unfortunately, these two do not play together. Assume we have temperature data

temp <- data.frame(month = c("Nov", "Nov", "Dec", "Dec"), day=c(1,15,1,15), temp=c(14,12,10,12)) %>%
   createDataFrame()
temp %>%
   collect()
##   month day temp
## 1   Nov   1   14
## 2   Nov  15   12
## 3   Dec   1   10
## 4   Dec  15   12

and we want to find the hottest day each month. Unlike in dplyr, you cannot just do

temp %>%
   groupBy("month") %>%
   arrange("temp") %>%
   agg(maxDay = day[1])

as grouped data cannot be arranged in spark.

8.2.1 Window Functions

A correct solution seems to be to use window functions. Windows in SQL parlance are blocks of rows where one can do certain operations. Windows can be defined in a different ways, here we are interested partitioning data by keys into windows. This can be done as windowPartitionBy("month"). This tells spark that data must be “partitioned” by month (I guess the ‘partition’ here is not the same as RDD partition), and we can operate separately on these partitions.

Now we want to order the partitions by temperature. This can be achieved by ws <- orderBy(windowPartitionBy("month"), "temp"). Note we first define the window partition, and thereafter it’s ordering. These operations form our window specification ws. This is just specification, we haven’t touched the data yet.

When done with window specification, one can use window functions. These can be invoked by over(x, window) where x is a column, mostly likely computed via window functions, and window is the window spec we defined above. To solve the task above, we can add the row number (i.e. temperature rank) for each day in month. So we can do:

ws <- orderBy(windowPartitionBy("month"), "temp")
temp %>%
   mutate(n = over(row_number(), ws)) %>%
   filter(column("n") == 1) %>%
   collect()
##   month day temp n
## 1   Dec   1   10 1
## 2   Nov  15   12 1

(note the syntax column("n") to refer to the column “n”).

This approach works otherwise great, but unfortunately it picks the lowest temperature day. I haven’t found a way to insert desc operator for orderBy sorting–it only accepts string column names, not column("temp"), no desc("temp"). So we have to resort on a stupid trick by reversing the sign of temp:

ws <- orderBy(windowPartitionBy("month"), "temp1")
temp %>%
   mutate(temp1 = -column("temp")) %>%
   mutate(n = over(row_number(), ws)) %>%
   filter(column("n") == 1) %>%
   collect()
##   month day temp temp1 n
## 1   Dec  15   12   -12 1
## 2   Nov   1   14   -14 1

Obviously, in the final version we may drop obsolete variables and order the data according to month.

8.2.2 arrange - groupBy

Note: it is tempting to do go the following way instead (see):

temp %>%
   arrange(column("month"), desc(column("temp"))) %>%
   groupBy("month") %>%
   agg(maxDay = first("day")) %>%
   collect()
##   month maxDay
## 1   Dec     15
## 2   Nov      1

This is simple, and seems to work, at least locally. Unfortunately there is no guarantee that groupBy preserves the order (see SPARK-16207).

8.3 Ordering with arrange

8.3.1 arrange

Spark dataframes can be ordered by arrange function. It orders the data by “natural order”, i.e. alphabetically for character columns and in numeric order for numerics. The syntax is similar to that of dplyr: arrange(col, col, ...). However, there are two separate functions:

arrange("col1", "col2", ...)

and

arrange(col1, col2, ...)

In the former case, arrange expects a list of column names, in the latter case a list of columns. The latter version also works with order-reversal function desc, the former version does not. As an example, use the temperature data from above:

temp %>%
   arrange("month", "temp") %>%
   collect()
##   month day temp
## 1   Dec   1   10
## 2   Dec  15   12
## 3   Nov  15   12
## 4   Nov   1   14
temp %>%
   arrange(column("month"), desc(column("temp"))) %>%
   collect()
##   month day temp
## 1   Dec  15   12
## 2   Dec   1   10
## 3   Nov   1   14
## 4   Nov  15   12

One cannot mix the two approaches: arrange("x", column("y")) will not work. (2.2.1)

8.3.2 arrange by a renamed column

Apparently you can rename a column and still arrange according to the old name. For instance:

data.frame(a=3:1, b=rnorm(3)) %>%
   createDataFrame() %>%
   rename(c=column("a")) %>%
   arrange(column("a")) %>%
   collect()
##   c          b
## 1 1 -0.8356286
## 2 2  0.1836433
## 3 3 -0.6264538

The dataframe does not contain column a, it is ordered by column c, nee a, instead. See alse merging and lazy evaluation where something similar happens when renaming columns. (2.4.4)

8.4 Pivoting

Pivoting is a version of crosstable, a way to transform dataframe from long to wide format, and perform aggregation in the process.

The arguments for pivot are:

pivot(x, colname, values=list())

It takes in a grouped dataframe x and pivoting column colname. The result will be grouped by distinct values of the groups vertically (this is what the grouped data is about), and by distinct values of the pivot column horizontally (this is what colname does). The new column names are created from the pivoting column values, potentially combined with the name of the aggregation function. One may supply a pre-defined list of distinct pivoting values, it is claimed to be more efficient than to let spark figure these out itself. This vertical-horizontal grouping should be aggregated using the agg function. The entries for non-existing combinations will be NA in the aggregated table.

Pivot has it’s quirks:

  • distinct integer values will be converted to floats and you see values like 100.0 instead of 100 in column names. Convert the pivoting columns to characters. (2.4.4)

8.4.1 Example: Create an aggregated crosstable

Let’s create a simple long-form dataframe of three columns: quarter, id, and result:

dfp <- data.frame(quarter=1 + rbinom(12, 3, 0.3),
                  id = (100 + rbinom(12,4,0.5)) %>%
                     as.character(),
                  result=runif(12)) %>%
   createDataFrame()
dfp %>%
   collect() %>%
   head()
##   quarter  id    result
## 1       3 102 0.4820801
## 2       2 103 0.5995658
## 3       2 103 0.4935413
## 4       1 101 0.1862176
## 5       1 102 0.8273733
## 6       1 101 0.6684667

and pivot it into a summary table: each row is quarter, each column is id, and each cell in the table is sum of the corresponding results:

dfp %>%
   groupBy("quarter") %>%
   pivot("quarter") %>%
   agg(sum(column("result"))) %>%
   collect()
##   quarter      1.0      2.0       3.0       4.0
## 1       1 1.682058       NA        NA        NA
## 2       4       NA       NA        NA 0.6470602
## 3       3       NA       NA 0.4820801        NA
## 4       2       NA 3.951222        NA        NA

Now we have a wide form data frame where for each quarter (row) we have sum of results for each id (columns). Note the many missing values: for instance, id 101 did not have any results in quarter 2.

8.4.2 Example: find rankings

As the next example, let’s find the largest and second-largest result for each quarter-id combination using the same data as above.

Our first task is to rank the results by group. We use window functions to do this:

ws <- orderBy(windowPartitionBy("quarter", "id"), "negResult")

As we want to find the largest values (i.e. rank in descending order), we have to create negResult, the negative of the result, as spark orderBy can only do it in ascending order. Thereafter we create the corresponding rank variable, and retain only the two largest values. Finally, we create a new column idrank that contains the id and rank in a form that is suitable for the final column name after pivoting (remember, pivot creates column names from values):

dfp %<>%
   mutate(negResult = -column("result")) %>%
   mutate(rank = over(row_number(), ws)) %>%
                           # compute the descending rank 'rank'
   filter(column("rank") <= 2) %>%
                           # retain only the two largest results
   mutate(idrank = concat(lit("id-"), column("id"),
                          lit("-"), column("rank")))
                           # note: have to use 'concat',
                           # not 'paste'
                           # character constants must be
                           # in 'lit'
dfp %>%
   collect()
##    quarter  id    result  negResult rank   idrank
## 1        1 101 0.6684667 -0.6684667    1 id-101-1
## 2        1 101 0.1862176 -0.1862176    2 id-101-2
## 3        1 102 0.8273733 -0.8273733    1 id-102-1
## 4        2 100 0.7237109 -0.7237109    1 id-100-1
## 5        2 101 0.7942399 -0.7942399    1 id-101-1
## 6        2 102 0.4112744 -0.4112744    1 id-102-1
## 7        2 102 0.1079436 -0.1079436    2 id-102-2
## 8        2 103 0.8209463 -0.8209463    1 id-103-1
## 9        2 103 0.5995658 -0.5995658    2 id-103-2
## 10       3 102 0.4820801 -0.4820801    1 id-102-1
## 11       4 102 0.6470602 -0.6470602    1 id-102-1

Now we have everything prepared. We group the result by quarter (quarters will remain in rows) and pivot by idrank, these will be in columns. Finally we aggregate by just picking the first value of result. Aggregation mechanism is not very important here as all the quarter-id-rank groups should only contain a single value.

dfp %>%
   groupBy("quarter") %>%
   pivot("idrank") %>%
   agg(first(column("result"))) %>%
   collect()
##   quarter  id-100-1  id-101-1  id-101-2  id-102-1  id-102-2  id-103-1  id-103-2
## 1       1        NA 0.6684667 0.1862176 0.8273733        NA        NA        NA
## 2       4        NA        NA        NA 0.6470602        NA        NA        NA
## 3       3        NA        NA        NA 0.4820801        NA        NA        NA
## 4       2 0.7237109 0.7942399        NA 0.4112744 0.1079436 0.8209463 0.5995658

Note the number of missing values in the result. For instance, as id 101 does not have any record for quarter 2, the corresponding entries are missing.

See also reshaping to wide form.

8.5 Reshaping

8.5.1 Wide to long

Spark does not have built-in reshaping function for transforming wide to long format, but explode pretty much does the job. It is slightly more circuituous though: first you have to create an array column of the desired individual columns, and thereafter you explode that column. Let’s create a wide form data frame below:

dfm <- data.frame(id=1:2, ua=runif(2), ub=rnorm(2)) %>%
   createDataFrame()
dfm %>%
   collect()
##   id        ua          ub
## 1  1 0.7829328  0.07456498
## 2  2 0.5530363 -1.98935170

The task is to transform this to the long form. First we can create a new array column by create_array and thereafter explode it into the desired shape:

dfm %>%
   mutate(tmp = create_array(column("ua"), column("ub"))) %>%
   mutate(value = explode(column("tmp"))) %>%
   collect()
##   id        ua          ub                    tmp       value
## 1  1 0.7829328  0.07456498 0.78293276, 0.07456498  0.78293276
## 2  1 0.7829328  0.07456498 0.78293276, 0.07456498  0.07456498
## 3  2 0.5530363 -1.98935170  0.5530363, -1.9893517  0.55303631
## 4  2 0.5530363 -1.98935170  0.5530363, -1.9893517 -1.98935170

The result will still contain ua and ub column, and the array column tmp. We may want to delete those.

There is no direct approach to use a list of column names for creating the array column. Instead, we can use do.call and the list of columns (not column names!) as it’s argument. The exact same result as above can be achieved through

cols <- c("ua", "ub")
withColumn(dfm, "tmp",
           do.call("create_array", lapply(cols, function(x) column(x)))) %>%
   mutate(value=explode(column("tmp"))) %>%
   drop(c(cols, "tmp")) %>%
   collect()
##   id       value
## 1  1  0.78293276
## 2  1  0.07456498
## 3  2  0.55303631
## 4  2 -1.98935170

Here we have deleted the original and temporary columns so the result is more likely what you had in mind as the long form data.

However, the solution does not show explicitly which original variable each row corrsponds to. In this simple example it iis obvious, these are just “a”, “b”, “a”, “b” in this order, but in a more complex case we may need much more guidance. In such case one can use posexplode function. It explodes the array column into two columns: pos corresponds to the order in the array, and col to the value:

dfm %>%
   mutate(tmp = create_array(column("ua"), column("ub"))) %>%
   select(c("id", posexplode(column("tmp")))) %>%
   collect()
##   id pos         col
## 1  1   0  0.78293276
## 2  1   1  0.07456498
## 3  2   0  0.55303631
## 4  2   1 -1.98935170

The code does the following: first it creates an array column tmp. Thereafter if posexplodes this into a pair of columns: pos 0 corresponds the value “a” and pos 1 to value “b”. And col are the corresponding values. As posexplode creates two columns, you cannot use mutate as that function only creates a single column at time. A more complete example may look as follows:

cols <- c("ua", "ub")
withColumn(dfm, "tmp",
           do.call("create_array", lapply(cols, function(x) column(x)))) %>%
   select(c("id", posexplode(column("tmp")))) %>%
   mutate(type = ifelse(column("pos") == 0, "a", "b")) %>%
   drop("pos") %>%
   rename(value = column("col")) %>%
   collect()
##   id       value type
## 1  1  0.78293276    a
## 2  1  0.07456498    b
## 3  2  0.55303631    a
## 4  2 -1.98935170    b

8.5.2 Long to wide

This can be achieved with pivoting. Here is an example:

df <- data.frame(id=c(72,72,811,811), type=c("a", "b", "a", "b"), val=rnorm(4)) %>%
   createDataFrame()
df %>%
   collect()
##    id type         val
## 1  72    a  0.61982575
## 2  72    b -0.05612874
## 3 811    a -0.15579551
## 4 811    b -1.47075238

Transform it into the wide form:

df %>%
   groupBy("id") %>%
   pivot("type") %>%
   agg(first(column("val"))) %>%
   collect()
##    id          a           b
## 1  72  0.6198257 -0.05612874
## 2 811 -0.1557955 -1.47075238

Note that the column names are the same as type-s. See pivoting for more information.

9 Working with strings

The following examples use a tiny dataframe of names:

names <- data.frame(name=c("Barnier", "Beto")) %>%
   createDataFrame()

9.1 substrings with substr

Substrings can be created with substr. It’s syntax is simple:

substr(x, start, end)
  • x is a column
  • start, end are the (1-based) start and end positions in the string. start and end must be specified, you cannot leave one out to take everything. Inf is not allowed either.

Example:

names %>%
   withColumn("x", substr(column("name"), 3, 30)) %>%
   collect()
##      name     x
## 1 Barnier rnier
## 2    Beto    to

This extracts characters 3 to 30 (i.e. from 3 till end) from these names.

Warning: in SparkR < 2.3.0, start position was to be given one past the first character to be extracted. So in the example above, one has to use substr(column("names"), 4, 20)

9.2 startsWith, endsWith

These are utility functions that test if a string column starts or ends with a given substring. These return a logical column.

startsWith(column, substring)
endsWith(column, substring)

Example:

names %>%
   withColumn("x", endsWith(column("name"), "to")) %>%
   collect()
##      name     x
## 1 Barnier FALSE
## 2    Beto  TRUE

9.3 Pattern in strings: contain

contain(column, pattern) tests simple (non-regexp) patterns in string columns:

names %>%
   withColumn("x", contains(column("name"), "ar")) %>%
   collect()
##      name     x
## 1 Barnier  TRUE
## 2    Beto FALSE

Regexp patterns are treated literally (?) and do not work (2.3.1).

9.4 Where is pattern in string: instr

instr(y, x) finds the location of the first occurrence of substring (not regexp) x in column y. Positions inside of string are counted from 1, returns 0 if the substring not found:

names %>%
   mutate(xs = instr(column("name"), "ar"),
          xr = instr(column("name"), "a.")) %>%
   collect()
##      name xs xr
## 1 Barnier  2  0
## 2    Beto  0  0

Regexp patterns are treated literally (?) and do not work (2.3.1).

9.5 Concatenating strings: concat

concat(x, ...) concatenates column x with other columns. Numeric columns are automatically converted to string colums. If you want to insert a character constant you have to put into lit(..) function to create a constant column:

names %>%
   mutate(yob = lit(1980L)) %>%
   mutate(mName = concat(lit("M. "), column("name"), lit(" "), column("yob"))) %>%
   collect()
##      name  yob           mName
## 1 Barnier 1980 M. Barnier 1980
## 2    Beto 1980    M. Beto 1980

No space is inserted between the two concatenated columns.

concat can only concat columns, not other expressions, even not cast expressions where converting other columns to strings.

9.6 Regexp replacement: regexp_replace

Syntax: regexp_replace(x, pattern, replacement) where x is to column to operate on. Pattern is a regexp, and replacement can contain java regexp groups $1, $2 etc.

names %>%
   withColumn("x", regexp_replace(column("name"), "r(.*)", "R")) %>%
   collect()
##      name    x
## 1 Barnier  BaR
## 2    Beto Beto

Here is another example using java replacement group $1:

names %>%
   withColumn("x", regexp_replace(column("name"), "r(.*)", "R$1")) %>%
   collect()
##      name       x
## 1 Barnier BaRnier
## 2    Beto    Beto

10 Handling Dates

10.1 Dates and Timestamps

There are two time formats of choice: dates and timestamps. Dates only represent a date (a day) while timestamps record the point of time in seconds. You probably want to use the former if the event spans the whole day (like birthday) and the latter if the event is about a specific point of time (say the beginning of a phone call). Both dates and timestamps can be converted to each other. Both dates and timestamps contain timezone information by default. Timestamps are stored internally as microseconds since UNIX epoch, including the timezone information, and when you collect() timestamp data, the result will be printed in the current (as set in your operating system) timezone by R. The similar conversion occurs with dates, it will be converted to the date in your current timezone (as set in your operating system) at 00:00:00 midnight in the date’s timezone (see below).

There are three types of timestamps:

  • timestamp: ordinary timestamp, microseconds since UNIX epoch, including timezone epoch
  • utc_timestamp: stored internally in the same way as timestamp, just using UTC as timezone
  • unix_timestamp: a number of seconds since the UNIX epoch (1970-01-01).

When comparing and converting dates and timestamps, spark assumes the date is a timestamp that occurs 00:00:00 at the relevant time zone.

10.1.1 Workflow tutorial

We demonstrate the date handling below using this data frame:

dateDF <- data.frame(string=c("9/21 1949 18:44:33", "5/3 2018 11:04:11")) %>%
   createDataFrame()
collect(dateDF)
##               string
## 1 9/21 1949 18:44:33
## 2  5/3 2018 11:04:11

We start with the string representation (variable string) and convert it into various other formats. Unlike timestamps, strings are displayed unambiguosly independent of your timezone settings.

In CSV files the date data is usually in some kind of yyyy-mm-dd form which will be read as character string. This can be converted to timestamps with to_timestamp:

dateDF2 <- dateDF %>%
   withColumn("date", to_timestamp(column("string"), "M/d yyyy HH:mm:ss"))
collect(dateDF2)
##               string                date
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11

As you can see, the result printed in your current time zone, correspond exactly to the string representation: to_timestamp puts the date column in your current time zone. Sometimes this is not what you want.

If the string that represents time is not given in your current time zone (TZ), you can add the time zone descriptor by concat to the string. For instance, if the original time was given in Kabul time (UTC+4:30), you can add the TZ descriptor “+04:30” (note the leading “0”) while also adding TZ format symbol “XXX” (see more in Spark Datetime Pattern):

dateDF %>%
   withColumn("date",
              to_timestamp(concat(column("string"), lit("+04:30")),
                           "M/d yyyy HH:mm:ssXXX")) %>%
   collect()
##               string                date
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33
## 2  5/3 2018 11:04:11 2018-05-02 23:34:11

Now the date column is different from the string representation (unless your OS is set to the Kabul TZ) and corresponds to your clock time at the moment when the clock in Kabul show what is stored in string. In this example, the text is complied in America/Los_Angeles time zone. to_timestamp and time zone formatting have their quirks:

  • TZ format “XXX” assume you supply the TZ information as “+dd:dd” or “-dd:dd”, so in this example it must be “+04:00”. “+4:00” will result in NA-s.

The fact that time output is always converted to your local time zone when printed may feel confusing and make the same code to produce different results in different time zones.

The timestamp columns can be easily compared with R POSIXct objects in a given TZ. For instance, we can extend the example above and check which case corresponds to Kabul time 18:44:33 September 21th, 1949:

dateDF %>%
   mutate(date = to_timestamp(concat(column("string"), lit("+04:30")),
                           "M/d yyyy HH:mm:ssXXX")) %>%
   mutate("6pm" = column("date") == ISOdate(1949,9,21, 18,44,33, tz="Asia/Kabul")) %>%
   collect()
## Warning in if (class(e2) == "Column") {: the condition has length > 1 and only the first element will be used
##               string                date   6pm
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33  TRUE
## 2  5/3 2018 11:04:11 2018-05-02 23:34:11 FALSE

Note that the in the first row date is considered equal to that point of time, although in the current timezone the clock shows different time.

One can convert between time zones with to_utc_timestamp and from_utc_timestamp. The former can be used to convert the timestamp from the current TZ into UTC, and the latter for converting it from UTC to the desired TZ. For instance:

One may also keep time in utc_timestamp:

dateDF %>%
   mutate(date = to_timestamp(column("string"), "M/d yyyy HH:mm:ss")) %>%
   mutate(dateUTC = to_utc_timestamp(column("date"), Sys.timezone())) %>%
   collect()
##               string                date             dateUTC
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 18:04:11

This converts the string date into an UTC timestamp. The result is intuitive if date is in your current TZ and you convert it to UTC usin this TZ. The result is just the clock in UTC when your clock shows date. to_utc_timestamp has it’s own issues:

  • if the supplied time zone is not recognized by the operating system, it is silently ignored.

From R point of view, the utc_timestamp is just another POSIXct object, exactly as timestamp.

10.1.2 Converting dates to characters

When we want to convert a timestamp to a string, we can use date_format(column, format string) (note: we need Spark Datetime Pattern symbols). The sring representation will use the current TZ as set by the operating system. For instance:

dateDF2 %>%
   withColumn("date2", date_format(column("date"), "yyyyMMdd")) %>%
   collect()
##               string                date    date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 20180503

It appears date_format does not allow expressions in column if invoked with mutate and gives a really unintuitive error:

dateDF2 %>%
   mutate(date2 = date_format(column("date") %>%
                              from_utc_timestamp("Asia/Kabul"), "yyyyMMdd")) %>%
   collect()
##               string                date    date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 20180503

However, this works when run through withColumn:

dateDF2 %>%
   withColumn("date2", date_format(column("date") %>%
                                   from_utc_timestamp("Asia/Kabul"), "yyyyMMdd")) %>%
   collect()
##               string                date    date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 20180503

(spark 2.4.0)

Alternatively, one can achieve a similar result with extractor functions year, month, dayofmonth, hour, minute, second. For instance:

dateDF2 %>%
   withColumn("date2", year(column("date"))*10000 +
                       month(column("date"))*100 +
                       dayofmonth(column("date"))) %>%
   collect()
##               string                date    date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 20180503

Warning: you must put the column in the first position in withColumn arithmetic expression. A literal number in the first position, such as 10000*year(column("date")) gives an error.

(spark 2.2.0)

10.1.3 Converting between R and spark

These rules apply when converting R dataframes into spark with createDataFrame.

R Date-s will be converted to date type. This only applies to the objects of true Date class, not to the dates with timezones in lubridate which are actually POSIXct objects (except if the timezone is “UTC” or similar).

R timestamps, i.e. POSIXct objects will be converted to timestamps. This also applies to dates with timezones as long as these are internally POSIXct objects.

df <- data.frame(date=as.Date("2018-10-19"), dateTZ=lubridate::ymd("2018-10-19", tz="Asia/Kabul"))
sapply(df, class)
## $date
## [1] "Date"
## 
## $dateTZ
## [1] "POSIXct" "POSIXt"

The first column, date is of class Date, the second is POSIXct.

df %>% createDataFrame() %>%
   printSchema()
## root
##  |-- date: date (nullable = true)
##  |-- dateTZ: timestamp (nullable = true)

As you can see, date is of class date, while dateTZ is timestamp. Dates with time zone set are not true dates in R.

When converting the spark time objects back to R with collect(), one get’s POSIXct objects from timestamps and Date-s from dates. Both are normally printed for the current time zone (as set in your operating system).

Dates with timezones in spark are converted to Date assuming the represent the instant 00:00:00 at the corresponding day and time zone (i.e. the TZ for this particular date), and the day of that instant in the current timezone is converted to R Date object. So the result depends on your timezone settings!

10.2 Date-specific functions

10.2.1 Converting data

10.2.1.1 to_utc_timestamp and from_utc_timestamp

to_utc_timestamp(y, x) convert column y into utc_timestamp in time zone x.

  • y: column. Columns of type timestamp are supported. It can be interpreted as time in UTC.
  • x: string, time zone. A time zone the system understands. Unknown time zones are silently ignored. It must be provided as text (like US/Pacific), numeric form (as -08:00) does not work.

For instance, if date is given in America/Los_Angeles TZ, the conversion might look like

dateDF2 %>%
   mutate(dateUTC = to_utc_timestamp(column("date"), Sys.timezone())) %>%
   collect()
##               string                date             dateUTC
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 18:04:11

The dateUTC column shows time in UTC when the local time in America/Los_Angeles is date.

The opposite conversion can be done with from_utc_timestamp(y, x) which converts UTC time in y into timestamp in time zone x.

For example, if date is in UTC then to convert this time into Shanghai time, we can do

dateDF2 %>%
   withColumn("shanghaitime", from_utc_timestamp(column("date"), "Asia/Shanghai")) %>%
   collect()
##               string                date        shanghaitime
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 19:04:11

Indeed, Shanghai time is 8 hours ahead of UTC.

10.2.1.2 Get date from timestamp or string: to_date

timestamp includes time in seconds, date only concerns dates. You can convert the former to the latter as with to_date(x, format) where x is the column, and format is the format string, only applicable if x is a character column. By default it uses ISO dates with format = "yyyy-MM-dd".

Example:

dateDF2 %>%
   withColumn("ymd", to_date(column("date"))) %>%
   collect()
##               string                date        ymd
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-21
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03

10.2.1.3 timestamp from character: to_timestamp()

to_timestamp(x, format) converts column x to timestamp using Spark Datetime Pattern specifications.

An example script that performs all these tasks:

dateDF %>%
   mutate(date = to_timestamp(column("string"), "M/d yyyy HH:mm:ss")) %>%
   collect()
##               string                date
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11

10.2.2 Extracting date/time information

10.2.2.1 Day of week dayofweek

dayofweek(x): day of week. Sunday=1, Saturday=7. Introduced in spark 2.3.0.

  • x: column. Columns of type utc_timestamp are supported.

Example:

dateDF2 %>%
   withColumn("dayOfWeek", dayofweek(column("date"))) %>%
   collect()
##               string                date dayOfWeek
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33         4
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11         5

10.2.2.2 Hour hour

Extract hour in the current (as set in os) timezone.

hour(x): hour (integer), in 24h format.

  • x: column. Columns of type utc_timestamp are supported.

Example:

dateDF2 %>%
   withColumn("hour", hour(column("date"))) %>%
   collect()
##               string                date hour
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33   18
## 2  5/3 2018 11:04:11 2018-05-03 11:04:11   11

Note: hour is extracted from the timestamp in the current time zone. If we are interested in hour in another TZ, we have to a) convert the timestamp to UTC timestamp with the current TZ, and b) convert it back to timestamp using the desired TZ. For instance, assume our string date is given in Kabul time, and converted to timestamp with TZ information as above:

dateDF1 <- dateDF %>%
   withColumn("date",
              to_timestamp(concat(column("string"), lit("+04:30")),
                           "M/d yyyy HH:mm:ssXXX"))
dateDF1 %>%
   collect()
##               string                date
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33
## 2  5/3 2018 11:04:11 2018-05-02 23:34:11

If we just use hour function, this would result in hour in the current timezone:

dateDF1 %>%
   withColumn("hour", hour(column("date"))) %>%
   collect()
##               string                date hour
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33    6
## 2  5/3 2018 11:04:11 2018-05-02 23:34:11   23

If we are interested in the hour in Kabul, we have to go through double conversion:

dateDF1 %>%
   withColumn("hour", column("date") %>%
                      to_utc_timestamp(Sys.timezone()) %>%
                      from_utc_timestamp("Asia/Kabul") %>%
                      hour()) %>%
   collect()
##               string                date hour
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33   18
## 2  5/3 2018 11:04:11 2018-05-02 23:34:11   11

The result corresponds to the hour in our original string representation of the timestamp.

11 Other notes (draft)

11.1 Merging by a large number of keys

during merges (and possibly sorts) you may end up with being out of memory with error message like

java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes

This is claimed to be related to grouping by key. Spark tries to allocate equal amount of memory for each key, and if the distribution is skewed, it attempts to allocate too much.

11.2 finding NA-s

I cannot understand how is.nan works. Use instead

isNull(col)

or

isNotNull(col)

they can be used for filtering and they mark NA-s as TRUE

11.3 User-Defined Functions

when doing UDF-s and return data.frame, you may get “Unsupported type for serialization” error. This may be related to the fact that returned data.frame converts strings to factors and sparkR cannot serialize factors. Use stringsAsFactors=FALSE

11.4 Parquet file for temporary data

you cannot easily overwrite parquet file with new data. The old cache remains in memory and it causes errors.

11.5 Reading Data

read.parquet(): expects a list of files as the first argument read.df() expects a path where to read all the files as the first argument

(spark 2.2)

11.6 select

cannot select columns of grouped data

11.7 dapply

the chunks fed to dapply are roungly of the size #obs/#partitions. nParallel dapply processes are run in parallel.

If you get error message error in parallel:::mcfork unable to create a pipe then you may have run out of processes. Spark 2.1.2 and R 3.5.0 did not play nicely with dapply leaving zombies, until you run out of the available processes. pIn this case fewer partitions may help. Issue seems to be gone when switching to spark 2.3.1.

One runs a separate dapply process for each partition? At least the number of dapply invocations seems to equal the number of partitions.

11.8 checkpointing

Have to check if unpersist helps to delete checkpoints.