Fitting 100 Statistical Distributions at Scale: 1000x Memory Reduction with PySpark

How spark-bestfit 2.0 fits distributions across Spark, Ray, and local backends with a class-based API and 1000x memory reduction

· 6 min read · Updated
spark python data engineering data science statistics optimization ray distributed computing

Updated January 5, 2026: This post has been updated for spark-bestfit v2.0.0, which introduces a class-based API, multi-backend support (Spark, Ray, Local), and many new features. See Migration from v0.x if upgrading from an earlier version.

I had a dataset with 100 million rows and needed to figure out which statistical distribution fit it best. The obvious approach—collect everything to the driver and use scipy—immediately OOM’d. Even after sampling down to something reasonable, I was still looking at 800MB sitting in driver memory just to test one distribution. And I needed to test about 100 of them.

The math doesn’t work. If you’re fitting 100 distributions and each one needs the full dataset, you either serialize 800MB to executors 100 times or collect it all to the driver and pray. Neither option scales.

What I Built

spark-bestfit is now at version 2.0.0 with a complete redesign. The library fits ~90 scipy.stats distributions in parallel, with driver memory usage dropping from 800MB to about 1MB—a 1000x reduction.

The v2.0 release adds:

  • Multi-backend architecture: Spark, Ray, or local execution
  • Class-based API: DistributionFitter and DiscreteDistributionFitter
  • 16 discrete distributions for count data
  • Bounded fitting, lazy metrics, multi-column fitting, Gaussian copula, bootstrap confidence intervals, distributed sampling, and model serialization

The histogram approach remains the core innovation, but now you have far more flexibility in how and where you run the fitting.

The Problem With Naive Approaches

There are two ways people usually try this, and both fail:

Collect to driver:

1
2
3
data = df.select("value").collect()  # 800MB in driver memory
for dist in scipy_distributions:
    params = dist.fit(data)

Works fine until your dataset is large enough to matter. Then you OOM.

Broadcast to executors:

1
2
3
data_broadcast = spark.sparkContext.broadcast(data)  # 800MB broadcast
def fit_dist(dist_name):
    return scipy.stats[dist_name].fit(data_broadcast.value)

This works but you’re broadcasting 800MB to every executor. Slow and wasteful.

Histograms Instead of Raw Data

Here’s the trick: you don’t actually need raw data to fit distributions. You need a histogram.

A 100-bin histogram is about 1KB. Broadcast that instead of 800MB and you can fit distributions in parallel without killing your cluster.

The flow:

  1. Compute histogram using Spark’s distributed aggregation (no data leaves executors)
  2. Broadcast the tiny histogram (~1KB) once
  3. Fit distributions in parallel using Pandas UDFs
  4. Get back a DataFrame with goodness-of-fit metrics

Driver memory goes from 800MB to 1MB. Problem solved.

The v2.0 Class-Based API

Version 2.0 replaces the functional fit_distributions() with a class-based design:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from spark_bestfit import DistributionFitter
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Fit continuous distributions
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value")

# Get best fit by K-S statistic (default)
best = results.best(n=1)[0]
print(f"Best: {best.distribution} (KS={best.ks_statistic:.4f})")

# Use the fitted distribution
samples = best.sample(size=10000)
pdf_values = best.pdf(x_array)
percentile_95 = best.ppf(0.95)

For count data, use the discrete fitter:

1
2
3
4
5
6
7
from spark_bestfit import DiscreteDistributionFitter

fitter = DiscreteDistributionFitter(spark)
results = fitter.fit(df, column="counts")

# AIC recommended for discrete model selection
best = results.best(n=1, metric="aic")[0]

The class-based approach enables lazy metric computation, chained operations, and better state management for complex workflows.

Multi-Backend Architecture

The library’s origin is Spark, but v2.0 adds pluggable backends:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from spark_bestfit import DistributionFitter, SparkBackend, RayBackend, LocalBackend

# SparkBackend (default) — production clusters, large datasets
fitter = DistributionFitter(spark)

# RayBackend — Ray clusters, ML pipelines, Kubernetes
fitter = DistributionFitter(backend=RayBackend())

# LocalBackend — development, testing, no cluster required
fitter = DistributionFitter(backend=LocalBackend())

All backends use identical scipy fitting, so fit quality is the same regardless of backend. The choice comes down to your infrastructure:

BackendUse Case
SparkBackendProduction clusters, 100M+ rows
RayBackendML workflows, Kubernetes deployments
LocalBackendDevelopment, unit testing

How It Works

The histogram computation uses Spark’s Bucketizer:

1
2
3
4
5
6
7
from pyspark.ml.feature import Bucketizer

splits = np.linspace(data_min, data_max, num_bins + 1)
bucketizer = Bucketizer(splits=splits.tolist(), inputCol="value", outputCol="bucket")

bucketed = bucketizer.transform(df)
histogram = bucketed.groupBy("bucket").count().orderBy("bucket").collect()

That runs as a distributed aggregation. The only thing that comes back to the driver is the histogram itself—a tiny array of bin counts.

The backend abstraction means the same fitting logic runs on Spark, Ray, or locally. The ExecutionBackend protocol defines methods for broadcasting data to workers and parallel execution.

New Features in v2.0

Bounded Distribution Fitting

Fit distributions with natural constraints (percentages, ages, prices):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Auto-detect bounds from data
results = fitter.fit(df, column="percentage", bounded=True)

# Explicit bounds
results = fitter.fit(
    df, column="price",
    bounded=True,
    lower_bound=0.0,
    upper_bound=1000.0,
)

# Samples automatically respect bounds
best = results.best(n=1)[0]
samples = best.sample(1000)  # All within [0, 1000]

Multi-Column Fitting

Fit multiple columns in a single operation:

1
2
3
4
5
results = fitter.fit(
    df,
    columns=["revenue", "cost", "margin"],
    per_column_bounds={"margin": (0, 1)}
)

Lazy Metrics

Defer expensive K-S and A-D computation for faster model selection:

1
2
3
4
5
6
7
results = fitter.fit(df, column="value", lazy_metrics=True)

# Fast selection by AIC
best = results.best(n=1, metric="aic")[0]

# KS computed on-demand only for top candidates
best_ks = results.best(n=1, metric="ks_statistic")[0]

Pre-filtering

Skip incompatible distributions based on data shape (20-50% faster):

1
results = fitter.fit(df, column="value", prefilter=True)

Bootstrap Confidence Intervals

Quantify parameter uncertainty:

1
2
3
results = fitter.fit(df, column="value", bootstrap_samples=100)
best = results.best(n=1)[0]
print(best.confidence_intervals)

Gaussian Copula

Generate correlated multi-column samples:

1
2
3
4
5
6
7
from spark_bestfit import GaussianCopula

copula = GaussianCopula.fit(results, df)
samples = copula.sample(n=10_000)

# Distributed sampling for massive scale
samples_df = copula.sample_spark(n=100_000_000)

Model Serialization

Save and load fitted distributions without Spark:

1
2
3
4
5
6
7
8
from spark_bestfit import DistributionFitResult

# Save to JSON
best.save("model.json")

# Load later - no Spark needed for inference
loaded = DistributionFitResult.load("model.json")
samples = loaded.sample(size=1000)

Visualization

Built-in plotting for distribution comparison:

1
2
# Requires: pip install spark-bestfit[plotting]
fitter.plot(best, df, "value", title="Best Fit Distribution")

Performance Impact

The difference is dramatic. On large datasets (100M+ rows):

Traditional approach: You’re either collecting hundreds of megabytes to the driver (OOM risk) or broadcasting that same data to executors repeatedly (slow).

Histogram approach: Driver holds ~1KB of histogram data. Broadcast happens once. The memory footprint drops by roughly 1000x compared to collecting raw data.

The v2.0 features add more optimization opportunities:

  • lazy_metrics=True skips expensive K-S/A-D computation until needed
  • prefilter=True eliminates incompatible distributions early
  • Distribution-aware partitioning spreads slow distributions across executors

When To Use This

If you have millions of rows and need to test many distributions, this makes sense. If your data fits in memory on a single machine, you can still use spark-bestfit with the LocalBackend—no Spark required.

The library is on PyPI and GitHub. Full documentation at spark-bestfit.readthedocs.io.

Migration from v0.x

If you’re upgrading from v0.3.x, here’s the API migration:

1
2
3
4
5
6
7
8
9
# v0.3.x (deprecated functional API)
from spark_bestfit import fit_distributions
results = fit_distributions(df, column="value", num_bins=100, metric="ks")

# v2.0.0 (class-based API)
from spark_bestfit import DistributionFitter
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value", num_bins=100)
best = results.best(n=1, metric="ks_statistic")[0]

Key changes:

  • fit_distributions()DistributionFitter(spark).fit()
  • Results are now a FitResults container with .best(), .filter(), .quality_report() methods
  • Each result is a DistributionFitResult with .pdf(), .cdf(), .sample(), .save() methods
  • Q-Q plots moved from spark_bestfit.plotting.qq_plot() to fitter.plot()

See the Migration Guide for complete details.

The Core Idea

Move computation to data, not data to computation. Histograms are a compact summary that enable distributed fitting without shuffling gigabytes around your cluster.

The v2.0 architecture takes this further by abstracting the execution backend. Whether you’re on a Spark cluster, Ray, or your laptop, the same histogram-based approach minimizes data movement.

Nothing revolutionary here—just using distributed primitives correctly.