Skip to main content

Delta

Reads data from delta tables saved in data catalog and writes data into delta table in data catalog.

note

Please choose the provider as delta on properties page.

Source

Source Parameters

ParameterDescriptionRequired
Database nameName of the databaseTrue
Table nameName of the tableTrue
ProviderProvider needs to be selected as deltaTrue
Filter PredicateWhere clause to filter the tableFalse
Read TimestampTime travel to a specific timestampFalse
Read VersionTime travel to a specific version of tableFalse
note

For time travel on delta tables:

  1. Only one among timestamp and version can be chosen at a time for time travel.
  2. Timestamp should be between the first commit timestamp and the latest commit timestamp in the table.
  3. Version needs to be an integer. Its value has to be between min and max version of table.

By default most recent version of each row is fetched if no time travel option is used.

info

To read more about delta time travel and its use cases click here.

Source Example

Generated Code

Without filter predicate

def Source(spark: SparkSession) -> DataFrame:
return spark.read.table(f"test_db.test_table")

With filter predicate

def Source(spark: SparkSession) -> DataFrame:
return spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")

Target

Target Parameters

ParameterDescriptionRequired
Database nameName of the databaseTrue
Table nameName of the tableTrue
Custom file pathUse custom file path to store underlying filesFalse
ProviderProvider needs to be selected as deltaTrue
Write ModeWhere clause to filter the table (Default is set to overwrite)True
Use insert intoFlag to use insert into method to write instead of save in spark.False
Optimise writeIf true, it optimizes spark partition sizes based on the actual dataFalse
Overwrite table schemaIf true, overwrites the schema of the delta table as per the dataframeFalse
Merge schemaIf true, then any columns that are present in the DataFrame but not in the target table are automatically added on to the end of the schema as part of a write transactionFalse
Partition ColumnsList of columns to partition the delta table byFalse
Overwrite partition predicateIf specified, then it selectively overwrites only the data that satisfies the given where clause expression.False

Below are different type of write modes which prophecy provided hive catalog supports.

Write ModeDescription
overwriteIf data already exists, existing data is expected to be overwritten by the contents of the DataFrame.
appendIf data already exists, contents of the DataFrame are expected to be appended to existing data.
ignoreIf data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
errorIf data already exists, an exception is expected to be thrown.
mergeInsert, delete and update data using the delta merge command.
scd2 mergeIt is a delta merge operation that stores and manages both current and historical data over time.
note

Among these write modes overwrite, append, ignore and error works the same way as in case of parquet file writes.

To read more about using merge write mode click here

To read more about using scd2 merge write mode click here

Target Example

Generated Code

def Target(spark: SparkSession, in0: DataFrame):
in0.write\
.format("delta")\
.mode("overwrite")\
.saveAsTable("test_db.test_table")