v1.0.0 (February 7, 2026): Configuration-driven batch and streaming pipelines, lifecycle hooks, resilience patterns, secrets management. Python 3.10+, PySpark 3.5+.
I wrote about the Scala version a few months ago. The problem it solves hasn’t changed—SparkSession boilerplate, scattered configs, observability bolted on after the fact. But most Spark teams I’ve worked with run PySpark, not Scala. A Scala-only library doesn’t help them.
This isn’t a port. It’s a ground-up reimplementation using Python idioms—dataclasses, Protocols, type hints, importlib. The architecture maps to the same concepts, but the code is native Python.
What I Built
pyspark-pipeline-framework (PyPI) lets you define PySpark pipelines in HOCON config files. The framework handles SparkSession lifecycle, component wiring, error handling, and observability. You write your data transformations; everything else is configuration.
Config parsing uses dataconf—HOCON into dataclasses with full type safety.
Running Pipelines
For local development, the ppf-run CLI handles runner setup, logging, and exit codes:
| |
For cluster deployments, use spark-submit with a thin entry point:
| |
Where main.py is minimal:
| |
The framework doesn’t replace spark-submit—it sits on top of it. ppf-run is just a convenience wrapper that does the same thing.
Configuration with dataconf
I covered the “why config-driven” rationale in the Scala post. The Python side uses the same HOCON format but with snake_case fields parsed into dataclasses via dataconf.
{
name: "customer-etl"
version: "1.0.0"
spark {
app_name: "Customer ETL"
master: "local[*]"
}
components: [
{
name: "read_raw"
component_type: source
class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
config {
table_name: "raw.customers"
output_view: "raw_customers"
}
},
{
name: "transform"
component_type: transformation
class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
depends_on: ["read_raw"]
config {
sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
output_view: "cleaned"
}
},
{
name: "write"
component_type: sink
class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
depends_on: ["transform"]
config {
input_view: "cleaned"
output_table: "curated.customers"
}
}
]
}
Fields are snake_case throughout—class_path, component_type, depends_on. dataconf validates types at parse time, so a missing required field or wrong type fails fast with a clear message. I’m a contributor to dataconf, so the integration is tight.
Dynamic Component Loading
The Scala version uses reflection to instantiate components from class names. Same architectural goal here—decouple the framework from application code—but Python’s solution is simpler.
The framework uses importlib to resolve class_path strings and calls a from_config() classmethod on each component:
| |
The framework injects a SparkSession via set_spark_session() before calling run(). No reflection magic—just standard Python imports.
Lifecycle Hooks
Hooks receive callbacks at pipeline and component lifecycle events. Five built-in implementations cover the common cases:
LoggingHooks— logs lifecycle eventsMetricsHooks— collects timing and retry countsDataQualityHooks— runs quality checks between stagesAuditHooks— emits audit events for complianceCheckpointHooks— saves state for resume-on-failure
Compose them with CompositeHooks:
| |
Events are structured JSON with run_id, component_name, duration_ms, and status—same shape as the Scala version, so your dashboards work across both.
Three-Layer Architecture
Same structure as the Scala framework, same rationale:
core/— zero Spark dependency. Config models, hooks protocol, secrets, quality checks.runtime/— SparkSession lifecycle,DataFlowbase class, streaming.runner/— CLI orchestration,SimplePipelineRunner, checkpoint/resume.
The practical payoff: unit tests for config parsing, hook composition, and secret resolution run without Spark. Fast feedback loops.
Resilience Patterns
Retry with exponential backoff and circuit breakers are both configurable per-component in HOCON:
components: [
{
name: "flaky_source"
component_type: source
class_path: "my.module.FlakySource"
retry {
max_attempts: 3
initial_delay_seconds: 1.0
max_delay_seconds: 30.0
backoff_multiplier: 2.0
}
circuit_breaker {
failure_threshold: 5
timeout_seconds: 60.0
}
}
]
For long-running pipelines, CheckpointHooks with LocalCheckpointStore saves state after each successful component. On restart, load_checkpoint_for_resume picks up where you left off:
| |
Streaming
StreamingPipeline provides a base class with source, sink, optional transform, and trigger config:
| |
Built-in sources: Kafka, File, Delta, Iceberg, Rate. Built-in sinks: Kafka, Delta, Console, Iceberg, File. Trigger types include processing time, once, available-now, and continuous. Graceful shutdown is handled by the framework.
Secrets Management
Three providers out of the box: environment variables, AWS Secrets Manager, and HashiCorp Vault.
| |
SecretsCache wraps the resolver with a TTL so you’re not hitting Vault or AWS on every component invocation.
Getting Started
| |
| |
Full documentation on ReadTheDocs.
Coming From the Scala Version
| Aspect | Scala | Python |
|---|---|---|
| Config library | PureConfig | dataconf |
| Type safety | Compile-time | Runtime (mypy + dataclass) |
| Component pattern | Trait + companion object | DataFlow ABC + from_config() |
| Dynamic loading | Scala reflection | importlib |
| Metrics | Micrometer | MeterRegistry protocol |
| Build tool | SBT | Hatchling + pip |
| Config fields | instance-type, instance-name | class_path, name |
| Secrets syntax | ${secret:env://VAR} | SecretsReference(provider="env", key="VAR") |
| CLI | Main class via spark-submit | ppf-run |
The concepts transfer directly. Config files need minor field renaming (camelCase to snake_case). Components need reimplementation but follow the same pattern—config in, data transformation out. If you’ve used the Scala version, you already understand the architecture.
GitHub · PyPI · ReadTheDocs