Skip to main content

Avro

Avro format is a row-based storage format for Hadoop, which is widely used as a serialization platform. Avro format stores the schema in JSON format, making it easy to read and interpret by any program. The data itself is stored in a binary format making it compact and efficient in Avro files.

This gem allows you to read from or write to an avro file.

Source

Reads data from avro files present at a path

Source Parameters

ParameterDescriptionRequiredDefault
LocationFile path where avro files are presentTrueNone
Recursive File LookupThis is used to recursively load files and it disables partition inferring. Its default value is false. If data source explicitly specifies the partitionSpec when recursiveFileLookup is true, exception will be thrown.FalseFalse
Path Global FilterAn optional glob pattern to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery.FalseNone
Modified BeforeAn optional timestamp to only include files with modification times occurring before the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)FalseNone
Modified AfterAn optional timestamp to only include files with modification times occurring after the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)FalseNone
Avro SchemaOptional schema provided by a user in JSON format. When reading Avro, this option can be set to an evolved schema, which is compatible but different with the actual Avro schema. The deserialization schema will be consistent with the evolved schema. For example, if we set an evolved schema containing one additional column with a default value, the reading result in Spark will contain the new column too.FalseNone
ignoreExtensionThe option controls ignoring of files without .avro extensions in read.
If the option is enabled, all files (with and without .avro extension) are loaded.
The option has been deprecated, and it will be removed in the future releases. Please use the general data source option](https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html#path-global-filter) pathGlobFilter for filtering file names.
FalseTrue

Example

Avro schema used

Generated Code

def read_avro(spark: SparkSession) -> DataFrame:
return spark.read\
.format("avro")\
.option("ignoreExtension", True)\
.option(
"avroSchema",
"{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\":\"firstname\",\"type\":\"string\"},{\"name\":\"middlename\",\"type\":\"string\"},{\"name\":\"lastname\",\"type\":\"string\"},{\"name\":\"dob_year\",\"type\":\"int\"},{\"name\":\"dob_month\",\"type\":\"int\"},{\"name\":\"gender\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}"
)\
.load("dbfs:/FileStore/Users/abhinav/avro/test.avro")


Target

Target Parameters

Write data as avro files at the specified path.

ParameterDescriptionRequiredDefault
LocationFile path where avro files are presentTrueNone
Avro SchemaOptional schema provided by a user in JSON format. When writing Avro, this option can be set if the expected output Avro schema doesn't match the schema converted by Spark. For example, the expected schema of one column is of "enum" type, instead of "string" type in the default converted schema.FalseNone
Record NameTop level record name in write result, which is required in Avro spec.FalsetopLevelRecord
Record NamespaceRecord namespace in write result.False""
CompressionThe compression option allows to specify a compression codec used in write.
Currently supported codecs are uncompressed, snappy, deflate, bzip2, xz and zstandard.
If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.
Falsesnappy
Write ModeWrite mode for dataframeTrueerror
Partition ColumnsList of columns to partition the avro files byFalseNone

Example

Generated Code

def write_avro(spark: SparkSession, in0: DataFrame):
in0.write\
.format("avro")\
.mode("overwrite")\
.partitionBy("dob_year","dob_month")\
.save("dbfs:/data/test_output.avro")
info

To know more about tweaking avro related properties in spark config click here.