Delta Lake on AWS EMR — #1

@data-foundation-guy
5 min readApr 20, 2021

--

Delta Lake Overview — Delete, Update and Merge on S3

Delete, Update and Merge are operations that are commonly performed on databases. Object stores typically do not support update semantics and are limited to append (create or replace)semantics. Object stores like S3 are not indexed so deleting data at a granular (row) level is hard and requires reading all the data, scanning it to find the record, deleting it and then replacing the whole object in S3. This is expensive and not scalable.

Delta Lake converts all operations into append operations. So every operation results in the generation of a new object in S3. To note — Delta Lake uses the parquet columnar format to store all objects in S3.

The actual consistent snapshot of the data is built at read by combining the operations and parquet data files. You can read more at https://delta.io

Below I will showcase each feature of Delta Lake using a sample dataset. To note, I used EMR Notebooks to build this example and you can find the complete EMR notebook at the bottom of this post. All examples below are in PySpark. Delta Lake also has Scala and SQL support.

Finally, if you want to use Delta Lake — its preferable to use Databricks which has native support for it. Despite that, there are scenarios where you cannot use Databricks and yet want to utilize the amazing delete, update and merge capabilities of Delta Lake on object stores like S3. This post is targeted at those who want to use Delta Lake on S3 and EMR.

To note, EMR comes with native support for Apache Hudi which has similar functionality as Delta Lake. I took on this path of exploration primarily because I liked the delete, update and merge semantics of Delta Lake better. The wide support from the Spark community (including Databricks) makes Delta Lake’s future more secure in terms of feature evolution, bug fixes and troubleshooting support.

Let’s get started!

Setting up EMR for Delta Lake

EMR has a bootstrap action (https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-bootstrap.html) that allows you to install external libraries on every node of the cluster. In this case, I have uploaded the latest Delta Lake jar into an S3 bucket. The bootstrap script below copies the jar to Spark’s install path on each EMR node.

#!/bin/bashsudo aws s3 cp s3://mys3bucket-delta-lake-poc/emr-bootstrap-scripts/jars/delta-core_2.12–0.8.0.jar /usr/lib/spark/jars/

Setup SparkSession for Delta Lake Transactions

Read CSV Dataset and Store as Delta Table

I am using Kaggle’s IBM HR Analytics Employee Attrition & Performance dataset for this demonstration.

Read Delta Table from S3 and Merge Upserts into it

Above, we see that the Delta table has 24 records before we apply upserts.
6 records — 5 updates and 1 new record (EmployeeNumber:1530)are ingested for application to delta table
LastModifiedDateISO8601 confirms 5 records updated, 1 inserted and total row count is 25

Apply Deletes to Delta Table

2 records ingested for deletion on delta table
Total row count is 23 and EmployeeNumber:1515 and EmployeeNumber:1504 are now deleted from the table

Optimize Delta table with compaction

As previously mentioned, Delta Lake operates by creating new objects for all create, update and delete operations. This causes generation of a lot of small files in S3. Over a period of time, the I/O on reading lot of small files negatively affects the read performance.

To alleviate this phenomena, Delta Lake provides an OPTIMIZE function for compacting small files into larger chunks that helps with read performance. As of the writing of this post, the OPTIMIZE function is not available in the open source version of Delta Lake — but there is a workaround which provides similar results. Below — we have compacted the delta table into 5 parquet files using Spark’s RDD repartitioning functionality.

To verify compaction — generate the manifest for your delta table using below.

deltaTable.generate("symlink_format_manifest")

This will generate a manifest folder in the S3 location of your delta table. List that folder (using aws cli) to identify the manifest file. Download the manifest file and inspect it. It will list the names of the 5 parquet files that make up your delta table.

Time Travel on Delta tables to see older versions of the table

Shows 4 versions of the table with version 3 being the latest

We have already confirmed in previous steps that EmployeeNumber:1515 and EmployeeNumber:1504 have been deleted in the latest version of the table. Despite that, they show up below since we are looking at version:1 of the table.

More to read here about this feature — https://docs.delta.io/latest/delta-batch.html#-deltatimetravel

Vacuum Delta table to purge deleted records & older versions

To make deletes permanent a.k.a hard deletes and to purge older versions of data that are no longer referenced, delta lake provides the vacuum functionality. More to read here — https://docs.delta.io/latest/delta-utility.html#remove-files-no-longer-referenced-by-a-delta-table

In general usage, the retention period is by default 7 days and all versions are retained atleast 7 days. With default configs, running vacuum will ONLY delete versions older than 7 days.

To demonstrate this functionality, earlier in this post I had disabled data retention check (while creating the SparkSession) so it allows me to vacuum all versions upto 0 hours older than current version.

deltaTable.vacuum(0)

Accessing a previous version after running the above command will return an error.

Summary & Tips

Delta Lake achieves ACID properties by always generating new files in S3 for all update/delete/merge operations. The snapshot is built via reconciliation of all the files using the transaction log at read.
EMR with Spark3.0+ is recommended for use with Delta Lake. Delta Lake libraries do not come pre-installed with EMR but can be easily installed during EMR bootstrap.
Delta Lake writes to S3 backed persistence comes with some caveats — the main one being that all writes must originate from the same Spark master. Concurrent writes from different Spark masters/clusters to the same Delta Lake table is not guaranteed to produce consistent results and can lead to data loss.
In PART 2 of this series, I will cover Delta Lake integrations with AWS Glue and AWS Athena with examples and caveats.

For the complete EMR Notebook for this demonstration, see the gist below —

--

--

@data-foundation-guy
@data-foundation-guy

Written by @data-foundation-guy

All things data, analytics and machine learning

No responses yet