SparkR is a very valuable tool when working with big data. Unfortunately it’s documentation is rather sparse. Here I have collected a number of solutions and explanations I have been struggling myself with and hope others may find useful.
The source of this document is on Github (see file sparkr_notes.rmd), all comments and issues are welcome there.
In this text I assume you are familiar with magrittr pipes %>%
, %<>%
and %T>%
. Pipes are an excellent complement to SparkR (and to many other R tasks). A good source to read about pipes is Garret Grolemund and Hadley Wickham’s R for data science section Pipes.
SparkR takes a similar approach as dplyr for transforming data, so I strongly recommend you to familiarize yourself with dplyr before you start with spark. An excellent source for this is Garret Grolemund and Hadley Wickham’s R for data science, section Data Transformations. The similarity if further stressed by a number of functions (“verbs” in Grolemund and Wickham’s parlance) having exactly the same name and similar syntax. In case of SparkR, there are often two versions, one with dplyr-similar syntax, and one with it’s own distinct syntax.
The fact that the syntax is the same can cause issues, however. For instance, selection of certain variables can be done with select
:
select(data.frame, col, ...) # dplyr
select(SparkDataFrame, col, ...) # SparkR
There are two potential issues here:
select
, initiated by dplyr, is unable to invoke the SparkDataFrame
method. Instead you see an obscure error like Error in UseMethod(“select_”) : no applicable method for ‘select_’ applied to an object of class “SparkDataFrame”. If you want to use both dplyr and SparkR, load dplyr before you load SparkR.Besides select, the common functions include mutate
and rename
. There are also a plethora of other packages that export identically named functions, e.g. hour
in lubridate overrides the same function in SparkR.
The central data structure in spark is Resilient Distributed Dataset (RDD). Resilient means that if something breaks in the workflow, for instance a cluster node dies, the data can be recovered from an earlier stage and the RDD lineage information. So RDD is not just a dataset: a number in RDD is not just a number, but a number plus information where this number comes from.
Dataframes in spark are made of RDD-s and hence inherit the RDD properties, in particular the lineage.
As one operates repeatedly on the same data, the RDD lineage may grow without limits. This happens, for instance, if you add data to an RDD in a loop, and each time also filter out certain observations. This manifests with RDD operations getting slower and slower, one may also get java.lang.StackOverflowError
-s. A very valuable resource for seeing and debugging the RDD lineages is the web monitoring port (default value 4040).
A way to break the lineage is to use checkpointing. Checkpointing saves the current dataframe as “just data”, without any historical information. If should be saved on a fault-tolerant file system, such as HDFS, to ensure the whole process is fault-tolerant. All the following execution plans now take the resulting dataframe as a blank sheet (well, blank in terms of lineage, not data content!). Checkpointing can be done like this:
setCheckpointDir("/tmp")
# note: have to set checkpoint dir
checkpoint(df1) df2 <-
Now df2
is a “lineage-free” dataframe.
However, checkpointing is not without it’s downsides. It saves data on the disk, and this may be quite slow. Second, it may take quite a bit storage as you are essentially writing all your data on disk in uncompressed format.
Spark splits its work into jobs, stages, and tasks.
read.json
or select("id", "name")
. But as spark uses lazy evaluation and optimizes the queries, the jobs may not exactly correspond to the original code.Spark displays it’s progress on console when: a bar like
[Stage 0:====================================================> (55 + 1) / 60]
indicates that 55 tasks out of total 60 in stage 0 are done.
The best way to understand what are the jobs and stages is to check the DAG visualization in the Web monitoring window.
As spark is running on Java VM, you need java. Spark 2.2–2.4 only work on java 1.8. If you already have a correct version of java installed, the rest on linux and mac is just a straight sailing:
$HOME/local
)SPARK_HOME=$HOME/local/spark-x.x.x-bin-hadoop
or whatever is your exact installation directory. When only relying on R, the location of spark home can be set when initializing spark session.
Windows installation is more involved: you have to set a few paths and permissions.
If you have never previously installed java, you can just add the most recent Java 1.8.
If you have an incompatible version, you can add another java fairly easily. On debian-based linux (such as ubuntu), you can select the default java by
sudo update-alternatives --config javac
this updates a few symlinks to ensure the correct java version is taken as default.
Another way is just to specify the JAVA_HOME
environment variable. This approach also works on Mac, and it allows you to pick a specific version of java just for Spark while leaving the system defaults untouched. When running Spark from inside R, the easiest approach is to set the environment variable by Sys.setenv
with something like (on a mac)
Sys.setenv(JAVA_HOME="/Library/Java/JavaVirtualMachines/jdk1.8.0_52.jdk/Contents/Home")
where the path must correspond the correct Java 1.8 path on your computer.
Correct java must be specified before you initiate the spark session.
Here we only describe how to start spark from inside R. You have to a) load the library from a non-standard location, and b) initiate spark session. This can be done along the following lines:
file.path(Sys.getenv("HOME"), "local", "spark")
SPARK_HOME <-# this is the directory where your spark is located
library("SparkR", lib.loc=file.path(SPARK_HOME, "R", "lib"))
# note: SparkR with capital S
sparkR.session(master = "local[1]",
ss <-# note: sparkR with lower case s
sparkHome = SPARK_HOME,
appName = "spark demo")
the SPARK_HOME
points to the folder where spark is installed. If you downloaded and decompressed it into ~/Downloads
, the correct path may look like SPARK_HOME <- "~/Downloads/spark-2.4.0-hadoop2.7/R/lib"
. lib.loc
argument points to “R/lib” folder inside it. Note: including sparkHome
argument is only needed if you did not specify SPARK_HOME
environment variable as indicated above under installation.
If you have to choose a specific java version (see above), do it before you start spark session. You may as well do it before loading the library.
sparkR.session
accepts a number of options:
master
is the location the spark master (driver). "local[n]"
mean spark will be run locally, on your computer, utilizing n cpus. You can also use "local[*]"
if you want to utilize all logical cpus (one process per cpu thread).sparkConfig
is a list of configuration options. The more interesting ones are:
spark.driver.memory
: how much memory to allocate to the spark driver (or the complete spark process if run locally). Use character strings, such as "500m"
or "4g"
. If this is too small, you get java out-of-memory errors related to java heap space.spark.sql.parquet.compression.codec
: by default, spark compresses parquet files by snappy codec which creates rather large temporary files. You can specify a different codec, such as "gzip"
, during the session initialization. Later calls to sparkR.conf
do not work. It’s value is a string.Here is an example of initiating the spark session while also setting the configuration options:
sparkR.session(master = "local[2]",
ss <-appName = "sparkNotes",
sparkConfig=list(spark.sql.parquet.compression.codec="gzip",
spark.driver.memory="8g",
log4j.logger.org.apache.parquet="INFO"
) )
Spark can easily be used with knitr and the corresponding chunks are basically just R code. However, as spark objects are not R objects–they are pointers to the corresponding java objects–spark objects cannot be cached. You have to re-run your objects again (or build your own cache).
The basic run-time configuration can be returned by sparkR.conf()
. Without any arguments it returns all set options:
sparkR.conf()
## $log4j.logger.org.apache.parquet
## [1] "INFO"
##
## $spark.app.id
## [1] "local-1671222752549"
##
## $spark.app.name
## [1] "sparkr notes"
##
## $spark.app.startTime
## [1] "1671222751705"
##
## $spark.app.submitTime
## [1] "1671222751168"
##
## $spark.driver.extraJavaOptions
## [1] "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
##
## $spark.driver.host
## [1] "192.168.8.147"
##
## $spark.driver.memory
## [1] "8g"
##
## $spark.driver.port
## [1] "41333"
##
## $spark.executor.extraJavaOptions
## [1] "-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
##
## $spark.executor.id
## [1] "driver"
##
## $spark.executorEnv.LD_LIBRARY_PATH
## [1] "$LD_LIBRARY_PATH:/usr/lib/R/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/default-java/lib/server:/home/siim/.local/lib:"
##
## $spark.home
## [1] "/home/siim/local/spark-3.3.1-bin-hadoop3"
##
## $spark.master
## [1] "local[2]"
##
## $spark.r.sql.derby.temp.dir
## [1] "/tmp/RtmpwwrcA3"
##
## $spark.sql.catalogImplementation
## [1] "hive"
##
## $spark.sql.parquet.compression.codec
## [1] "gzip"
##
## $spark.sql.warehouse.dir
## [1] "file:/home/siim/proge/R-lookup-tables/spark-warehouse"
##
## $spark.submit.deployMode
## [1] "client"
##
## $spark.submit.pyFiles
## [1] ""
##
## $spark.ui.showConsoleProgress
## [1] "true"
Note that there is no obvious entry for parallelism. When invoked locally, it is visible as the entry of spark.master
, here “local[2]”, but no dedicated number is provided (2.3, 2.4). Neither can you see an entry for spark version, although the spark.home
directory name contains the version number in this installation.
Spark stores quite a bit of temporary data on disk so you should ensure you have plenty of disk space. The location of temporary folders can be set in the environment variable SPARK_LOCAL_DIRS
as a comma-separated list of directories:
SPARK_LOCAL_DIRS="$HOME/tmp,/tmp"
This creates temporary folders in two locations, one in $HOME/tmp
and another in /tmp
. The folders appear in pairs, names like blockmgr-6e392c2c...
and spark-f0e4221e...
.
Spark splits data to be saved roughly equally onto both of these devices. This means, in particular, that when one of the devices is full, spark fails even if there is still plenty of space on the other (2.1.2).
One can invoke sparkR in an ordinary R session like any other interactive command. This is great for testing/debugging and trying to understand spark. Note that the commands on spark are much slower compared to the base R when using small dataframes.
For anything resembling a more serious work I’d recommend to run SparkR in batch mode, exactly as any other R script.
Spark also sets up a small web server that allows you to monitor some of the spark’s internal workings.
Port 4040 shows the basics of the working: jobs, stages, and tasks. Here is an example of how what you can see in web monitoring.
8080 is for master
if you start spark locally (–master local[n]), then the 8080 and 8081 ports are not there, only 4040.
A very useful information you get from the web monitor is DAG visualization. This shows how the pieces of data are linked, which stages are done, and what kind of lookups spark has to do for the processing. You can see DAG for jobs when clicking on a particular job and selecting DAG Visualization, or a more detailed DAG if you select stages and check the stage DAG visualization instead.
If DAG gets too complicated then spark gets slow and it is advisable to break the lineage using checkpointing.
An R dataframe can be transformed to a spark dataframe using createDataFrame
function, the reverse is done by collect
. For instance:
data.frame(a=1:2) %>%
dfa <- createDataFrame()
dfa
## SparkDataFrame[a:int]
%>% collect() dfa
## a
## 1 1
## 2 2
However, note you cannot transform 0-row R dataframes into spark in this way
data.frame(a=integer()) %>%
dfa <- createDataFrame()
will give an unhelpful error regarding to subscript out of bounds.
CSV files are one of the most common and versatile forms of data. SparkR reads these pretty well but unfortunately, the documentation is not good enough to get started. The basic syntax is:
loadDF(fName,
source="csv",
header="true",
sep=",")
The most important arguments are:
fName
: the file name. The unix home directory marker ~
does not seem to work (it is taken as a literal character in the file name), unlike in ordinary R. However, there are a number of good news:
.bz2
, are decompressed transparently (2.2.0).fName
can be a shell pattern corresponding to multiple files, such as dir/*.csv
. In that case all these files are read and stacked into the resulting data frame.fName
can also be the folder’s name (both with and without the trailing slash) in which case all files in that folder are read. Note: in this case loadDF
loads all the files in that folder, including non-csv ones.source
: should be “csv” hereheader
: whether the file contains the first line as column names. Defaults to “false” which reads the eventual header names as the first line, and names columns to _c0
, _c1
etc. (2.2.0). Note: you have to specify java-ish "true"
and "false"
(in quotes) here, not R-ish TRUE
and FALSE
.sep
: field separator, default “,”.loadDF
loads everything as string type, you may want to convert the columns afterwards (this can probably also be achieved by schemas).
There is an alternative function, read.df
, that differs from loadDF by the default arguments.
JSON files can read with
load.json(fName)
fName
can be a vector of character file names.Reading parquet files goes with
read.parquet(file)
which returns a spark data frame. However, the handling of file
argument is non-trivial.
file
: file name, actually the name of the folder where parquet holds it’s files
file
accepts certain shell patterns: *
, ?
, and brace expansion. However, only a single set of brace expansion is allowed, and only with commas, not ..
ranges (linux, 2.3.1). Brace expansion can be combined with patters, so a name like /data/{home,work}/file*
is valid.write.parquet
, file
for reading also does tilde expansion. (linux, 3.0.1)file
as path/to/directory/*
. If you leave out the trailing wildcard *
, you get an error Unable to infer schema for Parquet (2.3.1).Writing parquet files is similar:
write.parquet(df, path, mode)
df
is your (spark) data framepath
is the path/file name
read.parquet
, path
does not do tilde expansion! So, e.g. storing data frame into a file "~/foo"
stores file foo
into a folder named ~
in the current directory. However, read.parquet
attempts to read file foo
from your home folder. (linux, 3.0.1)mode
: use "overwrite"
to overwrite existing file.Spark supports several ways to select (and order) columns from a dataframe. The main methods are select
function and R-style []
indexing:
data.frame(family=c("Hu", "Xi"),
dfa <-name=c("Jintao", "Jinping"),
yob=c(1942, 1953)) %>%
createDataFrame()
c("yob", "family")] %>%
dfa[, collect()
## yob family
## 1 1942 Hu
## 2 1953 Xi
%>% select("yob", "family") %>%
dfa collect()
## yob family
## 1 1942 Hu
## 2 1953 Xi
select
is rather permissive in it’s arguments, in addition to the above, one can also use select(c("yob", "family"))
and select(list("yob", "family"))
.
Dropping variables can be achieved with drop
. Drop can handle a vector of variable names:
c("name", "yob")
vars <-%>%
dfa drop(vars) %>%
collect()
## family
## 1 Hu
## 2 Xi
However, it cannot handle multiple variable names as separate arguments–drop("name", "yob")
will not work.
The way to filter observations based on a logical expression is filter
. It expects a column expression.
Example:
%>% filter(column("yob") > 1950) %>%
dfa collect()
## family name yob
## 1 Xi Jinping 1953
Filter will not work with multiple condidions, you have to use logical operators.
Filter wants your values to be of a single class, including a single value for S3 classes. This sometimes creates warnings, e.g. with dates that have class of c("POSIXct", "POSIXt")
:
data.frame(date = ISOdatetime(2018,10,17,14,34,12)) %>%
df <- createDataFrame()
%>%
df filter(column("date") == ISOdatetime(2018,10,17,14,34,12)) %>%
collect()
## Warning in if (class(e2) == "Column") {: the condition has length > 1 and only the first element will be used
## date
## 1 2018-10-17 14:34:12
It still works (Spark 3.3.1), but the warning is unpleasant. It may not work in Spark 3.2. Note that you cannot use suppressWarnings
with piping.
As a workaround, you can use as.POSIXct
when writing the conditions:
data.frame(x = c(6,9,12,15,18),
df <-date = as.POSIXct(c("2022-12-16 6:30",
"2022-12-17 9:31",
"2022-12-18 12:32",
"2022-12-19 15:33",
"2022-12-20 18:34")),
day = c("Fri", "Sat", "Sun", "Mon", "Tue")) %>%
createDataFrame()
%>%
df collect()
## x date day
## 1 6 2022-12-16 06:30:00 Fri
## 2 9 2022-12-17 09:31:00 Sat
## 3 12 2022-12-18 12:32:00 Sun
## 4 15 2022-12-19 15:33:00 Mon
## 5 18 2022-12-20 18:34:00 Tue
%>%
df filter(hour(column("date")) >= 9 &
hour(column("date")) < 17 & # working hours only
dayofweek(column("date")) < 5 # all days, except Fri, Sat
%>%
) collect()
## x date day
## 1 12 2022-12-18 12:32:00 Sun
## 2 15 2022-12-19 15:33:00 Mon
One can sample a certain percentage of observations with
sample(df, withReplacement, fraction)
where fraction is the percentage (between 0 and 1) of the original data to be sampled. The fraction is claimed to be approximate, and there seems to be no way to sample an exact number of observations.
One can access single columns with the $
-operator and the column
function. The $
-operator works in a fairly similar manner than for ordinary data frames. Note that in case of piping where your intermediate dataframe does not have a name, you may access the columns with .$
way like .$foo
.
Function column
is an alternative, it takes column name as a character: column("foo")
.
withColumn
and mutate
There are three functions for computing on columns: withColumn
, mutate
and transform
. All of these can either create new, or overwrite existing columns.
The arguments for withColumn
are:
withColumn(dataframe, columnName, expression)
where columnName
is a character column name (not a Column
object), and expression is a column expression: something you can create from existing columns and constants using functions and arithmetic operators.
It may include mathematical and string operations and other column-oriented functions. Take the example dataframe
%>%
dfa collect()
## family name yob
## 1 Hu Jintao 1942
## 2 Xi Jinping 1953
%>%
dfa withColumn("age2018", lit(2018) - column("yob")) %>%
collect()
## family name yob age2018
## 1 Hu Jintao 1942 76
## 2 Xi Jinping 1953 65
The column expression, lit(2018) - column("yob")
, computes the age in 2018. As column("yob")
is not a number (but a “Column” object), one cannot simply do 2018 - column("yob")
. Arithmetic with numbers and columns is limited (there is no “-” function with signature “numeric”, “Column”). lit(2018)
creates a new constant column, and now we perform arithmetic of two columns instead of a numeric and a column. However, arithmetic operators for columns can handle “numeric” further down in the signature:
%>%
dfa withColumn("age2018", (column("yob") - 2018)*(-1)) %>%
collect()
## family name yob age2018
## 1 Hu Jintao 1942 76
## 2 Xi Jinping 1953 65
Now R selects “-” with signature “Column” and “numeric”, and this works well.
mutate
offers largely overlapping functionality, but it can take more than one computation at a time, and it’s expressions are in the form name = expression
. We can achieve the above, and more, in the following example:
%>%
dfa mutate(age2018 = lit(2018) - column("yob"),
age2000 = lit(2000) - column("yob")
%>%
) collect()
## family name yob age2018 age2000
## 1 Hu Jintao 1942 76 58
## 2 Xi Jinping 1953 65 47
Note that name
must be unquoted. We need two withColumn
commands to achieve equivalent results.
Mutate has a few quirks:
x
, sometimes do not work and lead to obscure errors.mutate
to create more than one variable, you cannot use the previously created variables in the next expressions. You need a new mutate
call.date_format
.Finally, transform
seems to be pretty much an alias for mutate
.
Constant columns can be added with the lit
function:
data.frame(a=1:2) %>%
foo <- createDataFrame()
%>%
foo withColumn("b", lit(77)) %>%
collect()
## a b
## 1 1 77
## 2 2 77
will add a constant column b = 77 to the dataframe, withColumn
will most likely figure out the correct type.
You can add missing values in the same fashion. However, if you just do withColumn("c", lit(NA))
, you get a column of type null which may cause trouble downstream (cannot save it in a parquet file, for instance). You should cast it into the correct type, for instance to “integer”:
%>%
foo withColumn("b", cast(lit(NA), "integer")) %T>%
printSchema() %>%
collect()
## root
## |-- a: integer (nullable = true)
## |-- b: integer (nullable = true)
## a b
## 1 1 NA
## 2 2 NA
Indeed, as the schema indicates, b is now an integer.
There are two built-in functions for renaming. rename
expects a list of arguments in the form of newName = existingColumn
(after the data frame argument). Note: this is quoted or unquoted name = column, the right hand side must not be just a column name. This is perhaps the easiest way to rename variables:
createDataFrame(data.frame(a=1:2, b=c("Huang", "Goyal")))
data <-%>%
data rename(id = column("a"), "name" = column("b")) %>%
collect()
## id name
## 1 1 Huang
## 2 2 Goyal
withColumnRenamed
expects two arguments (after the data frame argument): old column name and new column name. Note: these are column names, not columns, and the order is the other way around:
createDataFrame(data.frame(a=1:2, b=c("Huang", "Goyal")))
data <-%>%
data withColumnRenamed("a", "id") %>%
withColumnRenamed("b", "name") %>%
collect()
## id name
## 1 1 Huang
## 2 2 Goyal
However, certain short column names do not work (2.2.1) and exit with an uninformative error message:
%>%
data rename("x" = column("a")) %>%
collect()
## Error in h(simpleError(msg, call)): error in evaluating the argument 'x' in selecting a method for function 'collect': unable to find an inherited method for function 'rename' for signature '"Column"'
Name “xx” works, however:
%>%
data rename("xx" = column("a")) %>%
collect()
## xx b
## 1 1 Huang
## 2 2 Goyal
cast
cast(x, dataType)
is the general way of converting column x
to a new data type. dataType
must be one of the spark datatypes, not R data type. In particular
R data type | spark data type |
---|---|
character | string |
numeric | double |
A tailor-made way to make strings out of numbers is format_string(format, column)
. It appears to use standard C formatting specifications:
%>%
data withColumn("c", format_string("%10d", column("a"))) %>%
collect()
## a b c
## 1 1 Huang 1
## 2 2 Goyal 2
Note the space before the numbers in string format.
When feeding a string column for %d
format, spark fails with IllegalFormatConversionException
: d != org.apache.spark.unsafe.types.UTF8String
.
agg
agg
is the general aggregation function. It takes the data frame and aggregation arguments. For instance, let’s count the number of non-missing entries in a data frame:
data.frame(a=c(4, 1,NA)) %>%
df <- createDataFrame()
collect(df)
## a
## 1 4
## 2 1
## 3 NA
%>%
df withColumn("b", cast(isNotNull(column("a")), "integer")) %>%
agg(n = sum(column("b")),
mean = avg(column("a"))) %>%
collect()
## n mean
## 1 2 2.5
This will result in a dataframe that contain one column, n, value of which is the desired value. Here aggregation is determined by the sum
function, an agg
-s helper function, that takes a numeric column (this is why we have to cast the logical column to an integer).
Agg is very much similar to dplyr’s summarize
function: it aggregates data, but one must supply the aggregation rule.
There is a long list of agg
-s aggregation functions (x
is a column):
avg(x)
: average value. Missings are ignored.count(x)
: count number of objects (alias n
). Returns an integer.first(x, na.rm=FALSE)
: return the first element in the groupn(x)
: count number of objects (alias count
)sum(x)
: add the valuesrbind
: concatenating by rowsAdding nother dataframe underneath a spark dataframe is done with rbind
, exactly as in case of base R:
data.frame(name="sune", age=17) %>%
names1 <- createDataFrame()
data.frame(name="jestin", age=33) %>%
names2 <- createDataFrame()
rbind(names1, names2) %>%
collect()
## name age
## 1 sune 17
## 2 jestin 33
Both data frames must have the same columns in exactly the same order. If the order does not match, you get an error Names of input data frames are different
, even if the names are the same and the problem lies just in the order.
SparkR features two similar functions: merge
and join
. They do not behave in quite the same manner.
Merge syntax is rather similar to that of base::merge:
merge(x, y, by, suffixes)
x
, y
are data frame to be merged.by
is a vector of key columns, by default columns with similar name in both data frames.
by
columns can be of different type, for instance integer and double columns still match.suffixes
is a character vector of 2 specifying different suffixes for x
and y
to make unique names.
suffices=c("", "_y")
. You get an error. (2.2.0)If merging two data frames on a column of similar name, merge
creates new variables like foo_x
and foo_y
according to the respective origin data frame. For instance:
createDataFrame(data.frame(a=1:2, b=11:12, c=c("a", "b")))
foo <- createDataFrame(data.frame(a=1:2, b=21:22, d=c("x", "y")))
bar <-merge(foo, bar, by="a") %>% collect()
## a_x b c a_y b d
## 1 1 11 a 1 21 x
## 2 2 12 b 2 22 y
If, instead of by
, you specify both by.x
and by.y
, then it also renames the b
-s:
merge(foo, bar, by.x="a", by.y="a") %>% collect()
## a_x b_x c a_y b_y d
## 1 1 11 a 1 21 x
## 2 2 12 b 2 22 y
Join, in turn, does not rename but still keeps both version of the join column in the data:
join(foo, bar, foo$a == bar$a) %>% collect()
## a b c a b d
## 1 1 11 a 1 21 x
## 2 2 12 b 2 22 y
As stressed above, RDD is not just data but also information about how the data was created. merge
can use some of this lineage information. In particular, if the key column is created through lit()
constant, it refuses to merge. See the following examples:
data.frame(a=1:2, b=3:4) %>% createDataFrame()
foo <-%>% collect() foo
## a b
## 1 1 3
## 2 2 4
data.frame(c=5:6, a=c(0L,0L)) %>% createDataFrame()
bar <-%>% collect() bar
## c a
## 1 5 0
## 2 6 0
merge(foo, bar, all.x=TRUE) %>% collect()
## a_x b c a_y
## 1 1 3 NA NA
## 2 2 4 NA NA
Here we create two dataframes, foo
and bar
and merge these on column a
. As there are no common key values, the variables inherited from bar
are all missing. This is to be expected.
Now let’s modify this example with literal constants:
data.frame(c=5:6) %>% createDataFrame() %>% withColumn("a", lit(0))
baz <-%>% collect() baz
## c a
## 1 5 0
## 2 6 0
So the baz
dataframe looks the same as bar
above. However, now merge knows that the keys are just constants:
merge(foo, baz, all.x=TRUE) %>% collect()
fails with Join condition is missing or trivial error (2.3.1). Remember: dataframe is not just data but also the lineage information!
Spark’s lazy evaluation makes debugging hard, the code may fail before it even reaches the problematic line.
For instance, when ordering before merging where the merge introduces ambiguous columns you may see an error like:
org.apache.spark.sql.AnalysisException: Reference 'b' is ambiguous, could be: b#3, b#10.;
For instance:
createDataFrame(data.frame(a=1:2, b=11:12))
foo <- createDataFrame(data.frame(a=1:2, b=21:22))
bar <-%>%
foo groupBy("a", "b") %>%
count() %>%
merge(bar, by="a") %>%
rename(a = column("a_x")) %>%
collect()
fails with error Caused by: org.apache.spark.sql.AnalysisException: Reference 'b' is ambiguous, could be: b#3, b#10.;
. The problem is multiple instance of b
introduced through merge, but failure seems to occur at groupBy("a", "b")
. Indeed, this is the only place in the code where we use the variable b
.
However, a slightly simpler plan—just removing the rename
part that seems unrelated to the problem—still works and demonstrates what happens at merge:
createDataFrame(data.frame(a=1:2, b=11:12))
foo <- createDataFrame(data.frame(a=1:2, b=21:22))
bar <-%>%
foo groupBy("a", "b") %>%
count() %>%
merge(bar, by="a") %>%
collect()
## a_x b count a_y b
## 1 1 11 1 1 21
## 2 2 12 1 2 22
By removing rename(a = column("a_x"))
at the end of the previous example the code works, I guess this is related to how well can the code translated to spark execution plans. See alse arrange by a renamed column where something similar happens. (Spark 2.2.1, 2.4.4)
Many common tasks require ordering by group. SparkR has multi-column ordering function (arrange
) and multi-column grouping (groupBy
). Unfortunately, these two do not play together. Assume we have temperature data
data.frame(month = c("Nov", "Nov", "Dec", "Dec"), day=c(1,15,1,15), temp=c(14,12,10,12)) %>%
temp <- createDataFrame()
%>%
temp collect()
## month day temp
## 1 Nov 1 14
## 2 Nov 15 12
## 3 Dec 1 10
## 4 Dec 15 12
and we want to find the hottest day each month. Unlike in dplyr
, you cannot just do
%>%
temp groupBy("month") %>%
arrange("temp") %>%
agg(maxDay = day[1])
as grouped data cannot be arranged in spark.
A correct solution seems to be to use window functions. Windows in SQL parlance are blocks of rows where one can do certain operations. Windows can be defined in a different ways, here we are interested partitioning data by keys into windows. This can be done as windowPartitionBy("month")
. This tells spark that data must be “partitioned” by month (I guess the ‘partition’ here is not the same as RDD partition), and we can operate separately on these partitions.
Now we want to order the partitions by temperature. This can be achieved by ws <- orderBy(windowPartitionBy("month"), "temp")
. Note we first define the window partition, and thereafter it’s ordering. These operations form our window specification ws
. This is just specification, we haven’t touched the data yet.
When done with window specification, one can use window functions. These can be invoked by over(x, window)
where x
is a column, mostly likely computed via window functions, and window
is the window spec we defined above. To solve the task above, we can add the row number (i.e. temperature rank) for each day in month. So we can do:
orderBy(windowPartitionBy("month"), "temp")
ws <-%>%
temp mutate(n = over(row_number(), ws)) %>%
filter(column("n") == 1) %>%
collect()
## month day temp n
## 1 Dec 1 10 1
## 2 Nov 15 12 1
(note the syntax column("n")
to refer to the column “n”).
This approach works otherwise great, but unfortunately it picks the lowest temperature day. I haven’t found a way to insert desc
operator for orderBy
sorting–it only accepts string column names, not column("temp")
, no desc("temp")
. So we have to resort on a stupid trick by reversing the sign of temp:
orderBy(windowPartitionBy("month"), "temp1")
ws <-%>%
temp mutate(temp1 = -column("temp")) %>%
mutate(n = over(row_number(), ws)) %>%
filter(column("n") == 1) %>%
collect()
## month day temp temp1 n
## 1 Dec 15 12 -12 1
## 2 Nov 1 14 -14 1
Obviously, in the final version we may drop obsolete variables and order the data according to month.
arrange
- groupBy
Note: it is tempting to do go the following way instead (see):
%>%
temp arrange(column("month"), desc(column("temp"))) %>%
groupBy("month") %>%
agg(maxDay = first("day")) %>%
collect()
## month maxDay
## 1 Dec 15
## 2 Nov 1
This is simple, and seems to work, at least locally. Unfortunately there is no guarantee that groupBy
preserves the order (see SPARK-16207).
arrange
arrange
Spark dataframes can be ordered by arrange
function. It orders the data by “natural order”, i.e. alphabetically for character columns and in numeric order for numerics. The syntax is similar to that of dplyr: arrange(col, col, ...)
. However, there are two separate functions:
arrange("col1", "col2", ...)
and
arrange(col1, col2, ...)
In the former case, arrange
expects a list of column names, in the latter case a list of columns. The latter version also works with order-reversal function desc
, the former version does not. As an example, use the temperature data from above:
%>%
temp arrange("month", "temp") %>%
collect()
## month day temp
## 1 Dec 1 10
## 2 Dec 15 12
## 3 Nov 15 12
## 4 Nov 1 14
%>%
temp arrange(column("month"), desc(column("temp"))) %>%
collect()
## month day temp
## 1 Dec 15 12
## 2 Dec 1 10
## 3 Nov 1 14
## 4 Nov 15 12
One cannot mix the two approaches: arrange("x", column("y"))
will not work. (2.2.1)
arrange
by a renamed columnApparently you can rename a column and still arrange according to the old name. For instance:
data.frame(a=3:1, b=rnorm(3)) %>%
createDataFrame() %>%
rename(c=column("a")) %>%
arrange(column("a")) %>%
collect()
## c b
## 1 1 -0.8356286
## 2 2 0.1836433
## 3 3 -0.6264538
The dataframe does not contain column a
, it is ordered by column c
, nee a
, instead. See alse merging and lazy evaluation where something similar happens when renaming columns. (2.4.4)
Pivoting is a version of crosstable, a way to transform dataframe from long to wide format, and perform aggregation in the process.
The arguments for pivot
are:
pivot(x, colname, values=list())
It takes in a grouped dataframe x
and pivoting column colname
. The result will be grouped by distinct values of the groups vertically (this is what the grouped data is about), and by distinct values of the pivot column horizontally (this is what colname
does). The new column names are created from the pivoting column values, potentially combined with the name of the aggregation function. One may supply a pre-defined list of distinct pivoting values, it is claimed to be more efficient than to let spark figure these out itself. This vertical-horizontal grouping should be aggregated using the agg
function. The entries for non-existing combinations will be NA
in the aggregated table.
Pivot has it’s quirks:
100.0
instead of 100
in column names. Convert the pivoting columns to characters. (2.4.4)Let’s create a simple long-form dataframe of three columns: quarter
, id
, and result
:
data.frame(quarter=1 + rbinom(12, 3, 0.3),
dfp <-id = (100 + rbinom(12,4,0.5)) %>%
as.character(),
result=runif(12)) %>%
createDataFrame()
%>%
dfp collect() %>%
head()
## quarter id result
## 1 3 102 0.4820801
## 2 2 103 0.5995658
## 3 2 103 0.4935413
## 4 1 101 0.1862176
## 5 1 102 0.8273733
## 6 1 101 0.6684667
and pivot it into a summary table: each row is quarter
, each column is id
, and each cell in the table is sum of the corresponding result
s:
%>%
dfp groupBy("quarter") %>%
pivot("quarter") %>%
agg(sum(column("result"))) %>%
collect()
## quarter 1.0 2.0 3.0 4.0
## 1 1 1.682058 NA NA NA
## 2 4 NA NA NA 0.6470602
## 3 3 NA NA 0.4820801 NA
## 4 2 NA 3.951222 NA NA
Now we have a wide form data frame where for each quarter (row) we have sum of results for each id (columns). Note the many missing values: for instance, id 101 did not have any results in quarter 2.
As the next example, let’s find the largest and second-largest result for each quarter-id combination using the same data as above.
Our first task is to rank the results by group. We use window functions to do this:
orderBy(windowPartitionBy("quarter", "id"), "negResult") ws <-
As we want to find the largest values (i.e. rank in descending order), we have to create negResult
, the negative of the result
, as spark orderBy
can only do it in ascending order. Thereafter we create the corresponding rank variable, and retain only the two largest values. Finally, we create a new column idrank
that contains the id and rank in a form that is suitable for the final column name after pivoting (remember, pivot creates column names from values):
%<>%
dfp mutate(negResult = -column("result")) %>%
mutate(rank = over(row_number(), ws)) %>%
# compute the descending rank 'rank'
filter(column("rank") <= 2) %>%
# retain only the two largest results
mutate(idrank = concat(lit("id-"), column("id"),
lit("-"), column("rank")))
# note: have to use 'concat',
# not 'paste'
# character constants must be
# in 'lit'
%>%
dfp collect()
## quarter id result negResult rank idrank
## 1 1 101 0.6684667 -0.6684667 1 id-101-1
## 2 1 101 0.1862176 -0.1862176 2 id-101-2
## 3 1 102 0.8273733 -0.8273733 1 id-102-1
## 4 2 100 0.7237109 -0.7237109 1 id-100-1
## 5 2 101 0.7942399 -0.7942399 1 id-101-1
## 6 2 102 0.4112744 -0.4112744 1 id-102-1
## 7 2 102 0.1079436 -0.1079436 2 id-102-2
## 8 2 103 0.8209463 -0.8209463 1 id-103-1
## 9 2 103 0.5995658 -0.5995658 2 id-103-2
## 10 3 102 0.4820801 -0.4820801 1 id-102-1
## 11 4 102 0.6470602 -0.6470602 1 id-102-1
Now we have everything prepared. We group the result by quarter (quarters will remain in rows) and pivot by idrank
, these will be in columns. Finally we aggregate by just picking the first value of result
. Aggregation mechanism is not very important here as all the quarter-id-rank groups should only contain a single value.
%>%
dfp groupBy("quarter") %>%
pivot("idrank") %>%
agg(first(column("result"))) %>%
collect()
## quarter id-100-1 id-101-1 id-101-2 id-102-1 id-102-2 id-103-1 id-103-2
## 1 1 NA 0.6684667 0.1862176 0.8273733 NA NA NA
## 2 4 NA NA NA 0.6470602 NA NA NA
## 3 3 NA NA NA 0.4820801 NA NA NA
## 4 2 0.7237109 0.7942399 NA 0.4112744 0.1079436 0.8209463 0.5995658
Note the number of missing values in the result. For instance, as id 101 does not have any record for quarter 2, the corresponding entries are missing.
See also reshaping to wide form.
Spark does not have built-in reshaping function for transforming wide to long format, but explode
pretty much does the job. It is slightly more circuituous though: first you have to create an array column of the desired individual columns, and thereafter you explode
that column. Let’s create a wide form data frame below:
data.frame(id=1:2, ua=runif(2), ub=rnorm(2)) %>%
dfm <- createDataFrame()
%>%
dfm collect()
## id ua ub
## 1 1 0.7829328 0.07456498
## 2 2 0.5530363 -1.98935170
The task is to transform this to the long form. First we can create a new array column by create_array
and thereafter explode
it into the desired shape:
%>%
dfm mutate(tmp = create_array(column("ua"), column("ub"))) %>%
mutate(value = explode(column("tmp"))) %>%
collect()
## id ua ub tmp value
## 1 1 0.7829328 0.07456498 0.78293276, 0.07456498 0.78293276
## 2 1 0.7829328 0.07456498 0.78293276, 0.07456498 0.07456498
## 3 2 0.5530363 -1.98935170 0.5530363, -1.9893517 0.55303631
## 4 2 0.5530363 -1.98935170 0.5530363, -1.9893517 -1.98935170
The result will still contain ua
and ub
column, and the array column tmp
. We may want to delete those.
There is no direct approach to use a list of column names for creating the array column. Instead, we can use do.call
and the list of columns (not column names!) as it’s argument. The exact same result as above can be achieved through
c("ua", "ub")
cols <-withColumn(dfm, "tmp",
do.call("create_array", lapply(cols, function(x) column(x)))) %>%
mutate(value=explode(column("tmp"))) %>%
drop(c(cols, "tmp")) %>%
collect()
## id value
## 1 1 0.78293276
## 2 1 0.07456498
## 3 2 0.55303631
## 4 2 -1.98935170
Here we have deleted the original and temporary columns so the result is more likely what you had in mind as the long form data.
However, the solution does not show explicitly which original variable each row corrsponds to. In this simple example it iis obvious, these are just “a”, “b”, “a”, “b” in this order, but in a more complex case we may need much more guidance. In such case one can use posexplode
function. It explodes the array column into two columns: pos
corresponds to the order in the array, and col
to the value:
%>%
dfm mutate(tmp = create_array(column("ua"), column("ub"))) %>%
select(c("id", posexplode(column("tmp")))) %>%
collect()
## id pos col
## 1 1 0 0.78293276
## 2 1 1 0.07456498
## 3 2 0 0.55303631
## 4 2 1 -1.98935170
The code does the following: first it creates an array column tmp
. Thereafter if posexplodes this into a pair of columns: pos
0 corresponds the value “a” and pos
1 to value “b”. And col
are the corresponding values. As posexplode
creates two columns, you cannot use mutate
as that function only creates a single column at time. A more complete example may look as follows:
c("ua", "ub")
cols <-withColumn(dfm, "tmp",
do.call("create_array", lapply(cols, function(x) column(x)))) %>%
select(c("id", posexplode(column("tmp")))) %>%
mutate(type = ifelse(column("pos") == 0, "a", "b")) %>%
drop("pos") %>%
rename(value = column("col")) %>%
collect()
## id value type
## 1 1 0.78293276 a
## 2 1 0.07456498 b
## 3 2 0.55303631 a
## 4 2 -1.98935170 b
This can be achieved with pivoting. Here is an example:
data.frame(id=c(72,72,811,811), type=c("a", "b", "a", "b"), val=rnorm(4)) %>%
df <- createDataFrame()
%>%
df collect()
## id type val
## 1 72 a 0.61982575
## 2 72 b -0.05612874
## 3 811 a -0.15579551
## 4 811 b -1.47075238
Transform it into the wide form:
%>%
df groupBy("id") %>%
pivot("type") %>%
agg(first(column("val"))) %>%
collect()
## id a b
## 1 72 0.6198257 -0.05612874
## 2 811 -0.1557955 -1.47075238
Note that the column names are the same as type
-s. See pivoting for more information.
The following examples use a tiny dataframe of names:
data.frame(name=c("Barnier", "Beto")) %>%
names <- createDataFrame()
substr
Substrings can be created with substr
. It’s syntax is simple:
substr(x, start, end)
x
is a columnstart
, end
are the (1-based) start and end positions in the string. start
and end
must be specified, you cannot leave one out to take everything. Inf
is not allowed either.Example:
%>%
names withColumn("x", substr(column("name"), 3, 30)) %>%
collect()
## name x
## 1 Barnier rnier
## 2 Beto to
This extracts characters 3 to 30 (i.e. from 3 till end) from these names.
Warning: in SparkR < 2.3.0, start position was to be given one past the first character to be extracted. So in the example above, one has to use substr(column("names"), 4, 20)
These are utility functions that test if a string column starts or ends with a given substring. These return a logical column.
startsWith(column, substring)
endsWith(column, substring)
Example:
%>%
names withColumn("x", endsWith(column("name"), "to")) %>%
collect()
## name x
## 1 Barnier FALSE
## 2 Beto TRUE
contain
contain(column, pattern)
tests simple (non-regexp) patterns in string columns:
%>%
names withColumn("x", contains(column("name"), "ar")) %>%
collect()
## name x
## 1 Barnier TRUE
## 2 Beto FALSE
Regexp patterns are treated literally (?) and do not work (2.3.1).
instr
instr(y, x)
finds the location of the first occurrence of substring (not regexp) x
in column y
. Positions inside of string are counted from 1, returns 0 if the substring not found:
%>%
names mutate(xs = instr(column("name"), "ar"),
xr = instr(column("name"), "a.")) %>%
collect()
## name xs xr
## 1 Barnier 2 0
## 2 Beto 0 0
Regexp patterns are treated literally (?) and do not work (2.3.1).
concat
concat(x, ...)
concatenates column x
with other columns. Numeric columns are automatically converted to string colums. If you want to insert a character constant you have to put into lit(..)
function to create a constant column:
%>%
names mutate(yob = lit(1980L)) %>%
mutate(mName = concat(lit("M. "), column("name"), lit(" "), column("yob"))) %>%
collect()
## name yob mName
## 1 Barnier 1980 M. Barnier 1980
## 2 Beto 1980 M. Beto 1980
No space is inserted between the two concatenated columns.
concat
can only concat columns, not other expressions, even not cast
expressions where converting other columns to strings.
regexp_replace
Syntax: regexp_replace(x, pattern, replacement)
where x
is to column to operate on. Pattern is a regexp, and replacement can contain java regexp groups $1, $2 etc.
%>%
names withColumn("x", regexp_replace(column("name"), "r(.*)", "R")) %>%
collect()
## name x
## 1 Barnier BaR
## 2 Beto Beto
Here is another example using java replacement group $1:
%>%
names withColumn("x", regexp_replace(column("name"), "r(.*)", "R$1")) %>%
collect()
## name x
## 1 Barnier BaRnier
## 2 Beto Beto
There are two time formats of choice: dates and timestamps. Dates only represent a date (a day) while timestamps record the point of time in seconds. You probably want to use the former if the event spans the whole day (like birthday) and the latter if the event is about a specific point of time (say the beginning of a phone call). Both dates and timestamps can be converted to each other. Both dates and timestamps contain timezone information by default. Timestamps are stored internally as microseconds since UNIX epoch, including the timezone information, and when you collect()
timestamp data, the result will be printed in the current (as set in your operating system) timezone by R. The similar conversion occurs with dates, it will be converted to the date in your current timezone (as set in your operating system) at 00:00:00 midnight in the date’s timezone (see below).
There are three types of timestamps:
When comparing and converting dates and timestamps, spark assumes the date is a timestamp that occurs 00:00:00 at the relevant time zone.
We demonstrate the date handling below using this data frame:
data.frame(string=c("9/21 1949 18:44:33", "5/3 2018 11:04:11")) %>%
dateDF <- createDataFrame()
collect(dateDF)
## string
## 1 9/21 1949 18:44:33
## 2 5/3 2018 11:04:11
We start with the string representation (variable string
) and convert it into various other formats. Unlike timestamps, strings are displayed unambiguosly independent of your timezone settings.
In CSV files the date data is usually in some kind of yyyy-mm-dd form which will be read as character string. This can be converted to timestamps with to_timestamp
:
dateDF %>%
dateDF2 <- withColumn("date", to_timestamp(column("string"), "M/d yyyy HH:mm:ss"))
collect(dateDF2)
## string date
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11
As you can see, the result printed in your current time zone, correspond exactly to the string representation: to_timestamp
puts the date
column in your current time zone. Sometimes this is not what you want.
If the string that represents time is not given in your current time zone (TZ), you can add the time zone descriptor by concat
to the string. For instance, if the original time was given in Kabul time (UTC+4:30), you can add the TZ descriptor “+04:30” (note the leading “0”) while also adding TZ format symbol “XXX” (see more in Spark Datetime Pattern):
%>%
dateDF withColumn("date",
to_timestamp(concat(column("string"), lit("+04:30")),
"M/d yyyy HH:mm:ssXXX")) %>%
collect()
## string date
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33
## 2 5/3 2018 11:04:11 2018-05-02 23:34:11
Now the date
column is different from the string representation (unless your OS is set to the Kabul TZ) and corresponds to your clock time at the moment when the clock in Kabul show what is stored in string
. In this example, the text is complied in America/Los_Angeles time zone. to_timestamp
and time zone formatting have their quirks:
The fact that time output is always converted to your local time zone when printed may feel confusing and make the same code to produce different results in different time zones.
The timestamp columns can be easily compared with R POSIXct objects in a given TZ. For instance, we can extend the example above and check which case corresponds to Kabul time 18:44:33 September 21th, 1949:
%>%
dateDF mutate(date = to_timestamp(concat(column("string"), lit("+04:30")),
"M/d yyyy HH:mm:ssXXX")) %>%
mutate("6pm" = column("date") == ISOdate(1949,9,21, 18,44,33, tz="Asia/Kabul")) %>%
collect()
## Warning in if (class(e2) == "Column") {: the condition has length > 1 and only the first element will be used
## string date 6pm
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33 TRUE
## 2 5/3 2018 11:04:11 2018-05-02 23:34:11 FALSE
Note that the in the first row date
is considered equal to that point of time, although in the current timezone the clock shows different time.
One can convert between time zones with to_utc_timestamp and from_utc_timestamp. The former can be used to convert the timestamp from the current TZ into UTC, and the latter for converting it from UTC to the desired TZ. For instance:
One may also keep time in utc_timestamp:
%>%
dateDF mutate(date = to_timestamp(column("string"), "M/d yyyy HH:mm:ss")) %>%
mutate(dateUTC = to_utc_timestamp(column("date"), Sys.timezone())) %>%
collect()
## string date dateUTC
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 18:04:11
This converts the string date into an UTC timestamp. The result is intuitive if date
is in your current TZ and you convert it to UTC usin this TZ. The result is just the clock in UTC when your clock shows date
. to_utc_timestamp
has it’s own issues:
From R point of view, the utc_timestamp is just another POSIXct object, exactly as timestamp.
When we want to convert a timestamp to a string, we can use date_format(column, format string)
(note: we need Spark Datetime Pattern symbols). The sring representation will use the current TZ as set by the operating system. For instance:
%>%
dateDF2 withColumn("date2", date_format(column("date"), "yyyyMMdd")) %>%
collect()
## string date date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 20180503
It appears date_format
does not allow expressions in column
if invoked with mutate
and gives a really unintuitive error:
%>%
dateDF2 mutate(date2 = date_format(column("date") %>%
from_utc_timestamp("Asia/Kabul"), "yyyyMMdd")) %>%
collect()
## string date date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 20180503
However, this works when run through withColumn
:
%>%
dateDF2 withColumn("date2", date_format(column("date") %>%
from_utc_timestamp("Asia/Kabul"), "yyyyMMdd")) %>%
collect()
## string date date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 20180503
(spark 2.4.0)
Alternatively, one can achieve a similar result with extractor functions year
, month
, dayofmonth
, hour
, minute
, second
. For instance:
%>%
dateDF2 withColumn("date2", year(column("date"))*10000 +
month(column("date"))*100 +
dayofmonth(column("date"))) %>%
collect()
## string date date2
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 19490921
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 20180503
Warning: you must put the column in the first position in withColumn
arithmetic expression. A literal number in the first position, such as 10000*year(column("date"))
gives an error.
(spark 2.2.0)
These rules apply when converting R dataframes into spark with createDataFrame
.
R Date-s will be converted to date type. This only applies to the objects of true Date class, not to the dates with timezones in lubridate which are actually POSIXct objects (except if the timezone is “UTC” or similar).
R timestamps, i.e. POSIXct objects will be converted to timestamps. This also applies to dates with timezones as long as these are internally POSIXct objects.
data.frame(date=as.Date("2018-10-19"), dateTZ=lubridate::ymd("2018-10-19", tz="Asia/Kabul"))
df <-sapply(df, class)
## $date
## [1] "Date"
##
## $dateTZ
## [1] "POSIXct" "POSIXt"
The first column, date is of class Date, the second is POSIXct.
%>% createDataFrame() %>%
df printSchema()
## root
## |-- date: date (nullable = true)
## |-- dateTZ: timestamp (nullable = true)
As you can see, date is of class date, while dateTZ is timestamp. Dates with time zone set are not true dates in R.
When converting the spark time objects back to R with collect()
, one get’s POSIXct objects from timestamps and Date-s from dates. Both are normally printed for the current time zone (as set in your operating system).
Dates with timezones in spark are converted to Date assuming the represent the instant 00:00:00 at the corresponding day and time zone (i.e. the TZ for this particular date), and the day of that instant in the current timezone is converted to R Date object. So the result depends on your timezone settings!
to_utc_timestamp
and from_utc_timestamp
to_utc_timestamp(y, x)
convert column y into utc_timestamp in time zone x
.
y
: column. Columns of type timestamp are supported. It can be interpreted as time in UTC.x
: string, time zone. A time zone the system understands. Unknown time zones are silently ignored. It must be provided as text (like US/Pacific), numeric form (as -08:00) does not work.For instance, if date
is given in America/Los_Angeles TZ, the conversion might look like
%>%
dateDF2 mutate(dateUTC = to_utc_timestamp(column("date"), Sys.timezone())) %>%
collect()
## string date dateUTC
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 18:04:11
The dateUTC
column shows time in UTC when the local time in America/Los_Angeles is date
.
The opposite conversion can be done with from_utc_timestamp(y, x)
which converts UTC time in y into timestamp in time zone x.
For example, if date
is in UTC then to convert this time into Shanghai time, we can do
%>%
dateDF2 withColumn("shanghaitime", from_utc_timestamp(column("date"), "Asia/Shanghai")) %>%
collect()
## string date shanghaitime
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-22 02:44:33
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03 19:04:11
Indeed, Shanghai time is 8 hours ahead of UTC.
to_date
timestamp includes time in seconds, date only concerns dates. You can convert the former to the latter as with to_date(x, format)
where x
is the column, and format
is the format string, only applicable if x
is a character column. By default it uses ISO dates with format = "yyyy-MM-dd"
.
Example:
%>%
dateDF2 withColumn("ymd", to_date(column("date"))) %>%
collect()
## string date ymd
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 1949-09-21
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 2018-05-03
to_timestamp()
to_timestamp(x, format)
converts column x
to timestamp using Spark Datetime Pattern specifications.
An example script that performs all these tasks:
%>%
dateDF mutate(date = to_timestamp(column("string"), "M/d yyyy HH:mm:ss")) %>%
collect()
## string date
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11
dayofweek
dayofweek(x)
: day of week. Sunday=1, Saturday=7. Introduced in spark 2.3.0.
x
: column. Columns of type utc_timestamp are supported.Example:
%>%
dateDF2 withColumn("dayOfWeek", dayofweek(column("date"))) %>%
collect()
## string date dayOfWeek
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 4
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 5
hour
Extract hour in the current (as set in os) timezone.
hour(x)
: hour (integer), in 24h format.
x
: column. Columns of type utc_timestamp are supported.Example:
%>%
dateDF2 withColumn("hour", hour(column("date"))) %>%
collect()
## string date hour
## 1 9/21 1949 18:44:33 1949-09-21 18:44:33 18
## 2 5/3 2018 11:04:11 2018-05-03 11:04:11 11
Note: hour is extracted from the timestamp in the current time zone. If we are interested in hour in another TZ, we have to a) convert the timestamp to UTC timestamp with the current TZ, and b) convert it back to timestamp using the desired TZ. For instance, assume our string
date is given in Kabul time, and converted to timestamp with TZ information as above:
dateDF %>%
dateDF1 <- withColumn("date",
to_timestamp(concat(column("string"), lit("+04:30")),
"M/d yyyy HH:mm:ssXXX"))
%>%
dateDF1 collect()
## string date
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33
## 2 5/3 2018 11:04:11 2018-05-02 23:34:11
If we just use hour
function, this would result in hour in the current timezone:
%>%
dateDF1 withColumn("hour", hour(column("date"))) %>%
collect()
## string date hour
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33 6
## 2 5/3 2018 11:04:11 2018-05-02 23:34:11 23
If we are interested in the hour in Kabul, we have to go through double conversion:
%>%
dateDF1 withColumn("hour", column("date") %>%
to_utc_timestamp(Sys.timezone()) %>%
from_utc_timestamp("Asia/Kabul") %>%
hour()) %>%
collect()
## string date hour
## 1 9/21 1949 18:44:33 1949-09-21 06:14:33 18
## 2 5/3 2018 11:04:11 2018-05-02 23:34:11 11
The result corresponds to the hour in our original string representation of the timestamp.
during merges (and possibly sorts) you may end up with being out of memory with error message like
java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
This is claimed to be related to grouping by key. Spark tries to allocate equal amount of memory for each key, and if the distribution is skewed, it attempts to allocate too much.
I cannot understand how is.nan works. Use instead
isNull(col)
or
isNotNull(col)
they can be used for filtering and they mark NA-s as TRUE
when doing UDF-s and return data.frame, you may get “Unsupported type for serialization” error. This may be related to the fact that returned data.frame converts strings to factors and sparkR cannot serialize factors. Use stringsAsFactors=FALSE
you cannot easily overwrite parquet file with new data. The old cache remains in memory and it causes errors.
read.parquet(): expects a list of files as the first argument read.df() expects a path where to read all the files as the first argument
(spark 2.2)
cannot select columns of grouped data
the chunks fed to dapply are roungly of the size #obs/#partitions. nParallel dapply processes are run in parallel.
If you get error message error in parallel:::mcfork unable to create a pipe
then you may have run out of processes. Spark 2.1.2 and R 3.5.0 did not play nicely with dapply leaving zombies, until you run out of the available processes. pIn this case fewer partitions may help. Issue seems to be gone when switching to spark 2.3.1.
One runs a separate dapply process for each partition? At least the number of dapply invocations seems to equal the number of partitions.
Have to check if unpersist
helps to delete checkpoints.