Auto Loader syntax is the practical layer of Auto Loader functionality. You need to know the exact options, configuration parameters, and the difference between using Auto Loader in Python streaming code versus SQL based Lakeflow Spark Declarative Pipelines. The syntax also changes depending on whether you're doing a one time batch load with COPY INTO or continuous streaming ingestion.
The core entry point for Auto Loader in PySpark is spark.readStream.format("cloudFiles"). From there you chain options for format, schema location, schema hints, and how you want to handle schema evolution. Each option controls a specific behavior, and missing options like schema location can cause your pipeline to fail.
Understanding when to use COPY INTO versus Auto Loader is also critical. COPY INTO works for one time or batch loads, while Auto Loader is built for continuous streaming ingestion. They have different syntax and serve different patterns, but candidates often mix them up.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
df = spark.readStream \\
.format("cloudFiles") \\
.option("cloudFiles.format", "json") \\
.option("cloudFiles.schemaLocation", "/Volumes/my_catalog/my_schema/checkpoint") \\
.option("cloudFiles.inferColumnTypes", "true") \\
.option("cloudFiles.schemaHints", "customer_id INT, amount DOUBLE") \\
.load("s3://my-bucket/customer-data/")
df.writeStream \\
.format("delta") \\
.option("checkpointLocation", "/Volumes/my_catalog/my_schema/write_checkpoint") \\
.mode("append") \\
.table("raw_customers")
This pipeline reads JSON files from S3, infers column types, stores the schema in a Volumes location, and writes to the raw_customers table in append mode.
df = spark.readStream \\
.format("cloudFiles") \\
.option("cloudFiles.format", "parquet") \\
.option("cloudFiles.schemaLocation", "/Volumes/my_catalog/my_schema/schema_location") \\
.option("cloudFiles.schemaEvolutionMode", "rescue") \\
.option("cloudFiles.inferColumnTypes", "true") \\
.load("s3://data-lake/events/")
# Inspect rescued data to catch unexpected schema changes
df_with_rescued = df.select("*", "_rescued_data")
df_with_rescued.writeStream \\
.format("delta") \\
.option("checkpointLocation", "/Volumes/my_catalog/my_schema/write_ck") \\
.mode("append") \\
.table("events_bronze")