PySpark Pipeline Framework: Configuration-Driven Pipelines for the Python Ecosystem

How pyspark-pipeline-framework brings configuration-driven architecture, lifecycle hooks, and resilience patterns to PySpark

· 5 min read
python pyspark data engineering open source configuration streaming

v1.0.0 (February 7, 2026): Configuration-driven batch and streaming pipelines, lifecycle hooks, resilience patterns, secrets management. Python 3.10+, PySpark 3.5+.

I wrote about the Scala version a few months ago. The problem it solves hasn’t changed—SparkSession boilerplate, scattered configs, observability bolted on after the fact. But most Spark teams I’ve worked with run PySpark, not Scala. A Scala-only library doesn’t help them.

This isn’t a port. It’s a ground-up reimplementation using Python idioms—dataclasses, Protocols, type hints, importlib. The architecture maps to the same concepts, but the code is native Python.

What I Built

pyspark-pipeline-framework (PyPI) lets you define PySpark pipelines in HOCON config files. The framework handles SparkSession lifecycle, component wiring, error handling, and observability. You write your data transformations; everything else is configuration.

Config parsing uses dataconf—HOCON into dataclasses with full type safety.

Running Pipelines

For local development, the ppf-run CLI handles runner setup, logging, and exit codes:

1
2
3
ppf-run pipeline.conf
ppf-run pipeline.conf --dry-run          # validate without executing
ppf-run pipeline.conf --log-level DEBUG  # verbose output

For cluster deployments, use spark-submit with a thin entry point:

1
2
3
4
5
6
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --py-files pyspark_pipeline_framework-1.0.0-py3-none-any.whl,my_etl-1.0.0-py3-none-any.whl \
  --files conf/production.conf \
  main.py --config production.conf

Where main.py is minimal:

1
2
3
4
5
6
from pyspark_pipeline_framework.runner import SimplePipelineRunner

runner = SimplePipelineRunner.from_file("production.conf")
result = runner.run()
if not result.success:
    raise SystemExit(1)

The framework doesn’t replace spark-submit—it sits on top of it. ppf-run is just a convenience wrapper that does the same thing.

Configuration with dataconf

I covered the “why config-driven” rationale in the Scala post. The Python side uses the same HOCON format but with snake_case fields parsed into dataclasses via dataconf.

{
  name: "customer-etl"
  version: "1.0.0"

  spark {
    app_name: "Customer ETL"
    master: "local[*]"
  }

  components: [
    {
      name: "read_raw"
      component_type: source
      class_path: "pyspark_pipeline_framework.examples.batch.ReadTable"
      config {
        table_name: "raw.customers"
        output_view: "raw_customers"
      }
    },
    {
      name: "transform"
      component_type: transformation
      class_path: "pyspark_pipeline_framework.examples.batch.SqlTransform"
      depends_on: ["read_raw"]
      config {
        sql: "SELECT id, UPPER(name) AS name FROM raw_customers"
        output_view: "cleaned"
      }
    },
    {
      name: "write"
      component_type: sink
      class_path: "pyspark_pipeline_framework.examples.batch.WriteTable"
      depends_on: ["transform"]
      config {
        input_view: "cleaned"
        output_table: "curated.customers"
      }
    }
  ]
}

Fields are snake_case throughout—class_path, component_type, depends_on. dataconf validates types at parse time, so a missing required field or wrong type fails fast with a clear message. I’m a contributor to dataconf, so the integration is tight.

Dynamic Component Loading

The Scala version uses reflection to instantiate components from class names. Same architectural goal here—decouple the framework from application code—but Python’s solution is simpler.

The framework uses importlib to resolve class_path strings and calls a from_config() classmethod on each component:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from pyspark_pipeline_framework.runtime.dataflow.base import DataFlow


class MyTransform(DataFlow):
    def __init__(self, output_view: str) -> None:
        super().__init__()
        self._output_view = output_view

    @property
    def name(self) -> str:
        return "MyTransform"

    @classmethod
    def from_config(cls, config: dict) -> "MyTransform":
        return cls(**config)

    def run(self) -> None:
        df = self.spark.sql("SELECT id, UPPER(name) AS name FROM raw")
        df.createOrReplaceTempView(self._output_view)

The framework injects a SparkSession via set_spark_session() before calling run(). No reflection magic—just standard Python imports.

Lifecycle Hooks

Hooks receive callbacks at pipeline and component lifecycle events. Five built-in implementations cover the common cases:

  • LoggingHooks — logs lifecycle events
  • MetricsHooks — collects timing and retry counts
  • DataQualityHooks — runs quality checks between stages
  • AuditHooks — emits audit events for compliance
  • CheckpointHooks — saves state for resume-on-failure

Compose them with CompositeHooks:

1
2
3
4
5
6
7
8
from pyspark_pipeline_framework.runner import (
    CompositeHooks, LoggingHooks, MetricsHooks,
    SimplePipelineRunner,
)

hooks = CompositeHooks(LoggingHooks(), MetricsHooks())
runner = SimplePipelineRunner(config, hooks=hooks)
result = runner.run()

Events are structured JSON with run_id, component_name, duration_ms, and status—same shape as the Scala version, so your dashboards work across both.

Three-Layer Architecture

Same structure as the Scala framework, same rationale:

  • core/ — zero Spark dependency. Config models, hooks protocol, secrets, quality checks.
  • runtime/ — SparkSession lifecycle, DataFlow base class, streaming.
  • runner/ — CLI orchestration, SimplePipelineRunner, checkpoint/resume.

The practical payoff: unit tests for config parsing, hook composition, and secret resolution run without Spark. Fast feedback loops.

Resilience Patterns

Retry with exponential backoff and circuit breakers are both configurable per-component in HOCON:

components: [
  {
    name: "flaky_source"
    component_type: source
    class_path: "my.module.FlakySource"
    retry {
      max_attempts: 3
      initial_delay_seconds: 1.0
      max_delay_seconds: 30.0
      backoff_multiplier: 2.0
    }
    circuit_breaker {
      failure_threshold: 5
      timeout_seconds: 60.0
    }
  }
]

For long-running pipelines, CheckpointHooks with LocalCheckpointStore saves state after each successful component. On restart, load_checkpoint_for_resume picks up where you left off:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pyspark_pipeline_framework.runner import (
    LocalCheckpointStore, CheckpointHooks, CompositeHooks,
    compute_pipeline_fingerprint, load_checkpoint_for_resume,
)
from pathlib import Path

store = LocalCheckpointStore(Path("/tmp/checkpoints"))
fingerprint = compute_pipeline_fingerprint(config)
checkpoint_hooks = CheckpointHooks(store, run_id="run-001", pipeline_fingerprint=fingerprint)

hooks = CompositeHooks(LoggingHooks(), checkpoint_hooks)
runner = SimplePipelineRunner(config, hooks=hooks)

# Resume after failure
completed = load_checkpoint_for_resume(store, "run-001", config)
result = runner.run(completed_components=completed)

Streaming

StreamingPipeline provides a base class with source, sink, optional transform, and trigger config:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark_pipeline_framework.runtime.streaming.base import (
    StreamingPipeline, StreamingSource, StreamingSink,
    TriggerConfig, TriggerType,
)
from pyspark_pipeline_framework.runtime.streaming.sources import KafkaStreamingSource
from pyspark_pipeline_framework.runtime.streaming.sinks import DeltaStreamingSink


class EventIngestion(StreamingPipeline):
    def __init__(self) -> None:
        super().__init__()
        self._source = KafkaStreamingSource(
            bootstrap_servers="broker:9092", topics="events",
        )
        self._sink = DeltaStreamingSink(
            path="/data/delta/events",
            checkpoint_location="/checkpoints/events",
        )

    @property
    def name(self) -> str:
        return "EventIngestion"

    @property
    def source(self) -> StreamingSource:
        return self._source

    @property
    def sink(self) -> StreamingSink:
        return self._sink

    @property
    def trigger(self) -> TriggerConfig:
        return TriggerConfig(TriggerType.PROCESSING_TIME, "30 seconds")

    def transform(self, df):
        return df.selectExpr("CAST(value AS STRING) AS raw_json")

Built-in sources: Kafka, File, Delta, Iceberg, Rate. Built-in sinks: Kafka, Delta, Console, Iceberg, File. Trigger types include processing time, once, available-now, and continuous. Graceful shutdown is handled by the framework.

Secrets Management

Three providers out of the box: environment variables, AWS Secrets Manager, and HashiCorp Vault.

1
2
3
4
5
6
7
8
9
from pyspark_pipeline_framework.core.secrets import (
    EnvSecretsProvider, SecretsResolver, SecretsCache, SecretsReference,
)

resolver = SecretsResolver()
resolver.register(EnvSecretsProvider())

cache = SecretsCache(resolver, ttl_seconds=300)
result = cache.resolve(SecretsReference(provider="env", key="DB_PASSWORD"))

SecretsCache wraps the resolver with a TTL so you’re not hitting Vault or AWS on every component invocation.

Getting Started

1
pip install pyspark-pipeline-framework
1
2
3
4
from pyspark_pipeline_framework.runner import SimplePipelineRunner

runner = SimplePipelineRunner.from_file("pipeline.conf")
result = runner.run()

Full documentation on ReadTheDocs.

Coming From the Scala Version

AspectScalaPython
Config libraryPureConfigdataconf
Type safetyCompile-timeRuntime (mypy + dataclass)
Component patternTrait + companion objectDataFlow ABC + from_config()
Dynamic loadingScala reflectionimportlib
MetricsMicrometerMeterRegistry protocol
Build toolSBTHatchling + pip
Config fieldsinstance-type, instance-nameclass_path, name
Secrets syntax${secret:env://VAR}SecretsReference(provider="env", key="VAR")
CLIMain class via spark-submitppf-run

The concepts transfer directly. Config files need minor field renaming (camelCase to snake_case). Components need reimplementation but follow the same pattern—config in, data transformation out. If you’ve used the Scala version, you already understand the architecture.


GitHub · PyPI · ReadTheDocs