Skip to main content

Delta

Reads data from delta files present at a path and writes delta files to a path based on configuration.

Source

Reads data from delta files present at a path.

Source Parameters

ParameterDescriptionRequired
LocationFile path where delta files are presentTrue
Read TimestampTime travel to a specific timestampFalse
Read VersionTime travel to a specific version of tableFalse
note

For time travel on delta tables:

  1. Only one among timestamp and version can be chosen at a time for time travel.
  2. Timestamp should be between the first commit timestamp and the latest commit timestamp in the table.
  3. Version needs to be an integer. Its value has to be between min and max version of table.

By default most recent version of each row is fetched if no time travel option is used.

info

To read more about delta time travel and its use cases click here.

Source Example

Delta source example

Generated Code

Without time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Timestamp based time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("timestampAsOf", "2022-05-05")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Version based time travel

def readDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("versionAsOf", "0")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")


Target

Writes data in delta format in parquet files based on the configuration.

Target Parameters

ParameterDescriptionRequired
LocationFile path where delta files needs to be writtenTrue
Write modeWrite mode for dataframeTrue
Optimise writeIf true, it optimizes spark partition sizes based on the actual dataFalse
Overwrite table schemaIf true, overwrites the schema of the delta table as per the dataframeFalse
Merge schemaIf true, then any columns that are present in the DataFrame but not in the target table are automatically added on to the end of the schema as part of a write transactionFalse
Partition ColumnsList of columns to partition the delta table byFalse
Overwrite partition predicateIf specified, then it selectively overwrites only the data that satisfies the given where clause expression.False

Write modes

Below are different type of write modes which prophecy provided delta format supports.

Write ModeDescription
overwriteIf data already exists, existing data is expected to be overwritten by the contents of the DataFrame.
appendIf data already exists, contents of the DataFrame are expected to be appended to existing data.
ignoreIf data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
errorIf data already exists, an exception is expected to be thrown.
mergeInsert, delete and update data using the delta merge command.
scd2 mergeIt is a delta merge operation that stores and manages both current and historical data over time.

Among these write modes overwrite, append, ignore and error works the same way as in case of parquet file writes. Merge and SCD2 merge would be explained with examples in the following sections.

Target Example

Delta Target Example

Generated Code

def writeDelta(spark: SparkSession, in0: DataFrame):
return in0.write\
.format("delta")\
.option("optimizeWrite", True)\
.option("mergeSchema", True)\
.option("replaceWhere", "order_dt > '2022-01-01'")\
.option("overwriteSchema", True)\
.mode("overwrite")\
.partitionBy("order_dt")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Delta MERGE

Upsert data with Delta

You can upsert data from a source DataFrame into a target Delta table by using the MERGE operation. Delta Lake supports inserts, updates and deletes in MERGE.

This operation is also commonly known as upserting (update/insert) or SCD1 merge.

Parameters

ParameterDescriptionRequired
Source aliasAlias to use for source dataframeTrue
Target aliasAlias to use for existing delta tableTrue
Merge ConditionCondition to merge data from source dataframe to target table, which would be used to perform update, delete, or insert actions as specified.True
When Matched Update ActionAction to choose if update needs to be done or skipped.False
When Matched Update ConditionOptional condition for updating row. If a condition is specified, then it must evaluate to true for the row to be updated.False
When Matched Update ExpressionsOptional expressions for setting the values of columns that need to be updated.False
When Matched Delete ActionAction to choose if delete needs to be done or skipped.False
When Matched Delete ConditionOptional condition for deleting row. If a condition is specified, then it must evaluate to true for the row to be updated.False
When Not Matched ActionAction to choose if inserts needs to be done or skipped.False
When Not Matched ConditionOptional condition for inserting row. If a condition is specified, then it must evaluate to true for the row to be updated.False
When Not Matched ExpressionsOptional expressions for setting the values of columns that need to be updated.False
note
  1. At least one action out of update, delete or insert needs to be set.
  2. Delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vaccum for details.
  3. A merge operation can fail if multiple rows of the source dataset match and the merge attempts to update the same rows of the target Delta table. Deduplicate gem can be placed before target if duplicate rows at source are expected. :::

When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operations. :::

Example

Let's assume our initial customers table is as below:

Initial customer table

And we have the below updates coming into customers table

Customer table updates

Our output and configurations for SCD1 merge will look like below:

Generated Code

def writeDeltaMerge(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1"):
DeltaTable\
.forPath(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")\
.alias("target")\
.merge(in0.alias("source"), (col("source.customer_id") == col("target.customer_id")))\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")

SCD2

Parameters

ParameterDescriptionRequired
Key columnsList of key columns which would remain constantTrue
Historic columnsList of columns which would change over time for which history needs to be maintainedTrue
From time columnTime from which a particular row became validTrue
To time columnTime till which a particular row was validTrue
Min/old-value flagColumn placeholder to store the flag as true for the first entry of a particular keyTrue
Max/latest flagColumn placeholder to store the flag as true for the last entry of a particular keyTrue
Flag valuesOption to choose the min/max flag to be true/false or 0/1True

Example

Using the same customer tables as in our merge example above, output and configurations for SCD2 merge will look like below:

Generated Code

def writeDeltaSCD2(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"):
existingTable = DeltaTable.forPath(
spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"
)
updatesDF = in0.withColumn("minFlag", lit("true")).withColumn(
"maxFlag", lit("true")
)
existingDF = existingTable.toDF()
updateColumns = updatesDF.columns
stagedUpdatesDF = (
updatesDF.join(existingDF, ["customer_id"])
.where(
(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= updatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= updatesDF["customer_city"]
)
)
| (existingDF["customer_state"] != updatesDF["customer_state"])
)
)
)
.select(*[updatesDF[val] for val in updateColumns])
.withColumn("minFlag", lit("false"))
.withColumn("mergeKey", lit(None))
.union(updatesDF.withColumn("mergeKey", concat("customer_id")))
)
existingTable.alias("existingTable").merge(
stagedUpdatesDF.alias("staged_updates"),
concat(existingDF["customer_id"]) == stagedUpdatesDF["mergeKey"],
).whenMatchedUpdate(
condition=(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= stagedUpdatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= stagedUpdatesDF["customer_city"]
)
)
| (
existingDF["customer_state"]
!= stagedUpdatesDF["customer_state"]
)
)
),
set={"maxFlag": "false", "end_date": "staged_updates.updated_dt"},
)\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2")


SCD3

Using the same customer tables as in our merge example above, output and configurations for SCD3 merge will look like below.


info

To checkout our blogpost on making data lakehouse easier using Delta with Prophecy click here.