Rationale
Maintaining partitioned Hive tables is a frequent practice in a business. Properly structured tables are conducive to achieving robust performance through speeding up query execution (see Costa, Costa, and Santos 2019). Frequent use cases pertain to creating tables with hierarchical partition structure. In context of a data that is refreshed daily, the frequently utilised partition structure reflects years, months and dates.
Creating partitioned table
In HiveQL we would create the table with the following structure using the syntax below. In order to keep the development tidy, I’m creating a separate database on Hive which I will use for the purpose of creating tables for this article.
-- Initially test database is created to keep the development tidy
CREATE DATABASE blog COMMENT 'Blog article samples, can be deleted.';
-- Example table is created
CREATE TABLE blog.sample_partitioned_table (
value_column_a FLOAT COMMENT 'Column will hold 4-byte number',
value_column_b DOUBLE COMMENT '8-byte double precision',
value_column_c CHAR(1) COMMENT 'Fixed length varchar')
COMMENT 'Sample partitioned table stored as text file'
PARTITIONED BY (
part_year SMALLINT COMMENT 'Data load year, partition',
part_month TINYINT COMMENT 'Data load month, partition',
part_day TINYINT COMMENT 'Data load day, partition')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
The code snippet above achieves the following:
- Table
sample_partitioned_table
is created within newly created databaseblog
. - Three value columns are defined of
FLOAT
,DOUBLE
andCHAR(1)
types. Hive offers fairly rich set of data types and it’s worth to study the official documentation in order to ensure that selection of types is optimal considering the data we want to feed into the table. If we don’t have this clarity the wise solution may be to use more common types likesINT
. - The
blog.sample_partitioned_table
is stored as a text file with lines separated by tabs and rows separated with end line. - The table defines theree columns used to partition the data,
tinyint
type is suitable to hold values from -127 to 127 so it can be used to store day and month values,smallint
type holds values from -32,768 to 32,768 so it’s suitable for storing annual data.
For more substantial tables with frequent usage further consideration should be given to the Hive file formats as well as wider storage strategy aspects.
Inserting data from R
Inserting data using packages like glue
in R is trivial, and enables us to deliver highly readable code that will be easy to maintain.
Sample data
In an actual production setting, we would expect that our run will generate a data consistent with the table structure that should be saved as one of partitions. A common scenario could reflect summary events data generated for specific day, in business that structure would be frequently used to develop views on periodical business activity. For the purpose of example, I’m generating some sample data in R.
suppressPackageStartupMessages(library(tidyverse))
suppressPackageStartupMessages(library(lubridate))
# Generating two months of data
dates <- seq.Date(
from = as.Date("01-01-2010", format = "%d-%m-%Y"),
to = as.Date("28-02-2010", format = "%d-%m-%Y"),
by = "day"
)
# Each data will contain two rows of values corresponding to the column types
# that were previously defined in Hive
sample_data <- map_dfr(.x = dates, ~ tibble(val_a = runif(2),
val_b = runif(2),
val_c = sample(letters, 2),
update_year = year(.x),
update_month = month(.x),
update_day = day(.x)))
The created data looks as follows:
head(sample_data)
## # A tibble: 6 × 6
## val_a val_b val_c update_year update_month update_day
## <dbl> <dbl> <chr> <dbl> <dbl> <int>
## 1 0.504 0.0372 g 2010 1 1
## 2 0.275 0.701 t 2010 1 1
## 3 0.205 0.621 a 2010 1 2
## 4 0.130 0.282 j 2010 1 2
## 5 0.154 0.503 b 2010 1 3
## 6 0.906 0.404 i 2010 1 3
Following the successful creation of the dummy data we are in position to easily leverage the desired data structure. Using the sparklyr
package I’m creating a local connection.
suppressPackageStartupMessages(library(sparklyr))
sc <- spark_connect(master = "local")
For the purpose of the article I’ve also executed the provided-above HiveQL via Spark to ensure accessibility to data structures that would be structurally equivalent, ensuring smooth execution of the example code. Naturally, in a production setting, we would seldom look to create new Hive schema from an R script layer. Similarly, core tables storing results would be usually established outside regular production processes.
## <DBISparkResult>
## SQL DROP TABLE IF EXISTS blog.sample_partitioned_table
## ROWS Fetched: 0 [complete]
## Changed: 0
## <DBISparkResult>
## SQL DROP DATABASE IF EXISTS blog
## ROWS Fetched: 0 [complete]
## Changed: 0
res_DBI_data <- DBI::dbSendQuery(sc, "CREATE DATABASE blog COMMENT 'Blog article samples, can be deleted.'")
res_DBI_tble <- DBI::dbSendQuery(sc, "CREATE TABLE blog.sample_partitioned_table (
value_column_a FLOAT COMMENT 'Column will hold 4-byte number',
value_column_b DOUBLE COMMENT '8-byte double precision',
value_column_c CHAR(1) COMMENT 'Fixed length varchar')
COMMENT 'Sample partitioned table stored as text file'
PARTITIONED BY (
part_year SMALLINT COMMENT 'Data load year, partition',
part_month TINYINT COMMENT 'Data load month, partition',
part_day TINYINT COMMENT 'Data load day, partition')
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE")
Insert mechanism
Where the DBI package comes extremely handy is in inserting data into partitions. In context of our sample data we will want to populate every single partition with the respective modelling results. Courtesy of map_dfc
function the “results” are available in one table but the proposed method can be easily modified and applied across other object structures, like lists. As a first step we will be looking to copy the existing sample data into Spark.
tbl_sprk <- copy_to(sc, sample_data, "spark_sample_data")
In Spark our RDD is visible as spark_sample_data
we will be looking to use that table in order to insert our partition elements into permanent storage.
suppressPackageStartupMessages(library(DBI))
suppressPackageStartupMessages(library(glue))
res_pmap <- pmap(
.l = select(sample_data, update_year, update_month, update_day),
.f = ~ DBI::dbSendQuery(sc, glue("INSERT INTO TABLE blog.sample_partitioned_table
PARTITION (part_year={..1},
part_month={..2},
part_day={..3})
SELECT val_a, val_b, val_c
FROM spark_sample_data
WHERE update_year={..1} AND
update_month={..2} AND
update_day={..3}")))
Let’s unpack the code below. Our key goals are:
* Our aim is to populate partitions in our permanent Hive table blog.sample_partitioned_table
, hence the statement INSERT INTO TABLE blog.sample_partitioned_table
* We are working with some modelling/analytical data that currently sits in our Spark session as spark_sample_data
and we want for the relevant results in the data to land in the prescribed partitions on Spark
What happens in the process is as follows:
1. We are generating a list of vectors with partitions identifiers to iterate over. As I’ve created this sample data in the current session in memory I can just refer to those items using select
I would do that in the following manner select(sample_data, update_year, update_month, update_day)
2. I’m interested in iterating over each column simultaneously and pmap
function is excellent for that. Using ~
notation offered in pmap
I will be looking to refer to first object as ..1
to the second as ..2
and so on.
3. Glue package is used to insert strings with partitions identifier into the respective partition names.
4. SELECT
runs on spark RDD and also uses partition identifiers to get only subset of the data we are interested in and insert that subset into the desired partition.
Results
Following the operation above we can now explore the populated storage table. Sparklyr’s sdf_num_partitions
can be used to get a number of existing partitions. Tibble’s glimpse
can be used against the Spark data to get the preview of the created table.
tbl_perm <- tbl(sc, "blog.sample_partitioned_table")
sdf_num_partitions(tbl_perm)
## [1] 118
glimpse(tbl_perm)
## Rows: ??
## Columns: 6
## Database: spark_connection
## $ value_column_a <dbl> 0.86109686, 0.65282738, 0.86109686, 0.65282738, 0.78114…
## $ value_column_b <dbl> 0.22237399, 0.17974974, 0.22237399, 0.17974974, 0.83282…
## $ value_column_c <chr> "n", "m", "n", "m", "p", "g", "p", "g", "s", "j", "s", …
## $ part_year <int> 2010, 2010, 2010, 2010, 2010, 2010, 2010, 2010, 2010, 2…
## $ part_month <int> 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1…
## $ part_day <int> 9, 9, 9, 9, 20, 20, 20, 20, 16, 16, 16, 16, 20, 20, 20,…
Summary
Convenient and flexible functions facilitating string manipulations available in R make metaprogramming1 in R easy. Generating and manipulating Hive statements as strings may not be the most efficient strategy in the light of the API’s offered via sparklyr
or dbplyr
. Neverthless is possible to spot instances where R code makes those coding challenges partiuclary easy to solution and also to maintain.
References
Metaprogramming is a programming paradigm that treats other programming programs as data. In business, a BI setting metaprogramming is frequently used to generate efficiencies in routine data handling tasks, such as automating generation of SQL statements for importing data.↩︎