Fitting 100 Statistical Distributions at Scale: 1000x Memory Reduction with PySpark

How spark-bestfit 3.0 fits distributions across Spark, Ray, and local backends with survival analysis, mixture models, and multivariate support

· 7 min read · Updated
spark python data engineering data science statistics optimization ray distributed computing survival analysis

v3.0.0 (January 11, 2026): Survival analysis, Gaussian Mixture Models, multivariate Normal, adaptive sampling. See What’s New in v3.0.

v2.0.0 (January 5, 2026): Class-based API, multi-backend support (Spark, Ray, Local). See Migration from v0.x.

I had a dataset with 100 million rows and needed to figure out which statistical distribution fit it best. The obvious approach—collect everything to the driver and use scipy—immediately OOM’d. Even after sampling down to something reasonable, I was still looking at 800MB sitting in driver memory just to test one distribution. And I needed to test about 100 of them.

The math doesn’t work. If you’re fitting 100 distributions and each one needs the full dataset, you either serialize 800MB to executors 100 times or collect it all to the driver and pray. Neither option scales.

What I Built

spark-bestfit is now at version 3.0.0 with major new capabilities. The library fits ~90 scipy.stats distributions in parallel, with driver memory usage dropping from 800MB to about 1MB—a 1000x reduction.

The v3.0 release adds:

  • Survival analysis: Right-censored data support for time-to-event modeling
  • Gaussian Mixture Models: Multi-modal distribution fitting
  • Multivariate Normal: MLE estimation for correlated variables
  • Adaptive sampling: Automatic strategy selection based on data skewness

Previous v2.0 features:

  • Multi-backend architecture: Spark, Ray, or local execution
  • Class-based API: DistributionFitter and DiscreteDistributionFitter
  • 16 discrete distributions for count data
  • Bounded fitting, lazy metrics, multi-column fitting, Gaussian copula, bootstrap confidence intervals, distributed sampling, and model serialization

The histogram approach remains the core innovation, but now you have far more flexibility in how and where you run the fitting.

The Problem With Naive Approaches

There are two ways people usually try this, and both fail:

Collect to driver:

1
2
3
data = df.select("value").collect()  # 800MB in driver memory
for dist in scipy_distributions:
    params = dist.fit(data)

Works fine until your dataset is large enough to matter. Then you OOM.

Broadcast to executors:

1
2
3
data_broadcast = spark.sparkContext.broadcast(data)  # 800MB broadcast
def fit_dist(dist_name):
    return scipy.stats[dist_name].fit(data_broadcast.value)

This works but you’re broadcasting 800MB to every executor. Slow and wasteful.

Histograms Instead of Raw Data

Here’s the trick: you don’t actually need raw data to fit distributions. You need a histogram.

A 100-bin histogram is about 1KB. Broadcast that instead of 800MB and you can fit distributions in parallel without killing your cluster.

The flow:

  1. Compute histogram using Spark’s distributed aggregation (no data leaves executors)
  2. Broadcast the tiny histogram (~1KB) once
  3. Fit distributions in parallel using Pandas UDFs
  4. Get back a DataFrame with goodness-of-fit metrics

Driver memory goes from 800MB to 1MB. Problem solved.

The v2.0 Class-Based API

Version 2.0 replaces the functional fit_distributions() with a class-based design:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
from spark_bestfit import DistributionFitter
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Fit continuous distributions
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value")

# Get best fit by K-S statistic (default)
best = results.best(n=1)[0]
print(f"Best: {best.distribution} (KS={best.ks_statistic:.4f})")

# Use the fitted distribution
samples = best.sample(size=10000)
pdf_values = best.pdf(x_array)
percentile_95 = best.ppf(0.95)

For count data, use the discrete fitter:

1
2
3
4
5
6
7
from spark_bestfit import DiscreteDistributionFitter

fitter = DiscreteDistributionFitter(spark)
results = fitter.fit(df, column="counts")

# AIC recommended for discrete model selection
best = results.best(n=1, metric="aic")[0]

The class-based approach enables lazy metric computation, chained operations, and better state management for complex workflows.

Multi-Backend Architecture

The library’s origin is Spark, but v2.0 adds pluggable backends:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from spark_bestfit import DistributionFitter, SparkBackend, RayBackend, LocalBackend

# SparkBackend (default) — production clusters, large datasets
fitter = DistributionFitter(spark)

# RayBackend — Ray clusters, ML pipelines, Kubernetes
fitter = DistributionFitter(backend=RayBackend())

# LocalBackend — development, testing, no cluster required
fitter = DistributionFitter(backend=LocalBackend())

All backends use identical scipy fitting, so fit quality is the same regardless of backend. The choice comes down to your infrastructure:

BackendUse Case
SparkBackendProduction clusters, 100M+ rows
RayBackendML workflows, Kubernetes deployments
LocalBackendDevelopment, unit testing

How It Works

The histogram computation uses Spark’s Bucketizer:

1
2
3
4
5
6
7
from pyspark.ml.feature import Bucketizer

splits = np.linspace(data_min, data_max, num_bins + 1)
bucketizer = Bucketizer(splits=splits.tolist(), inputCol="value", outputCol="bucket")

bucketed = bucketizer.transform(df)
histogram = bucketed.groupBy("bucket").count().orderBy("bucket").collect()

That runs as a distributed aggregation. The only thing that comes back to the driver is the histogram itself—a tiny array of bin counts.

The backend abstraction means the same fitting logic runs on Spark, Ray, or locally. The ExecutionBackend protocol defines methods for broadcasting data to workers and parallel execution.

New Features in v2.0

Bounded Distribution Fitting

Fit distributions with natural constraints (percentages, ages, prices):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Auto-detect bounds from data
results = fitter.fit(df, column="percentage", bounded=True)

# Explicit bounds
results = fitter.fit(
    df, column="price",
    bounded=True,
    lower_bound=0.0,
    upper_bound=1000.0,
)

# Samples automatically respect bounds
best = results.best(n=1)[0]
samples = best.sample(1000)  # All within [0, 1000]

Multi-Column Fitting

Fit multiple columns in a single operation:

1
2
3
4
5
results = fitter.fit(
    df,
    columns=["revenue", "cost", "margin"],
    per_column_bounds={"margin": (0, 1)}
)

Lazy Metrics

Defer expensive K-S and A-D computation for faster model selection:

1
2
3
4
5
6
7
results = fitter.fit(df, column="value", lazy_metrics=True)

# Fast selection by AIC
best = results.best(n=1, metric="aic")[0]

# KS computed on-demand only for top candidates
best_ks = results.best(n=1, metric="ks_statistic")[0]

Pre-filtering

Skip incompatible distributions based on data shape (20-50% faster):

1
results = fitter.fit(df, column="value", prefilter=True)

Bootstrap Confidence Intervals

Quantify parameter uncertainty:

1
2
3
results = fitter.fit(df, column="value", bootstrap_samples=100)
best = results.best(n=1)[0]
print(best.confidence_intervals)

Gaussian Copula

Generate correlated multi-column samples:

1
2
3
4
5
6
7
from spark_bestfit import GaussianCopula

copula = GaussianCopula.fit(results, df)
samples = copula.sample(n=10_000)

# Distributed sampling for massive scale
samples_df = copula.sample_spark(n=100_000_000)

Model Serialization

Save and load fitted distributions without Spark:

1
2
3
4
5
6
7
8
from spark_bestfit import DistributionFitResult

# Save to JSON
best.save("model.json")

# Load later - no Spark needed for inference
loaded = DistributionFitResult.load("model.json")
samples = loaded.sample(size=1000)

Visualization

Built-in plotting for distribution comparison:

1
2
# Requires: pip install spark-bestfit[plotting]
fitter.plot(best, df, "value", title="Best Fit Distribution")

What’s New in v3.0

Version 3.0.0 adds survival analysis, mixture models, and multivariate fitting.

Right-Censored Data (Survival Analysis)

Time-to-event data with censoring—customers who haven’t churned, patients still alive at study end. Fits parametric survival distributions (Weibull, LogNormal, Exponential) with native censoring support.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from spark_bestfit import SurvivalFitter

fitter = SurvivalFitter(spark)
results = fitter.fit(
    df,
    duration_col="days_to_event",
    event_col="event_observed"  # 1 = event, 0 = censored
)

best = results.best(n=1)[0]
survival_probs = best.survival_function(time_points)
hazard = best.hazard(time_points)

Use cases: customer lifetime value, equipment failure prediction, clinical trial analysis.

Gaussian Mixture Models

Multi-modal data fitting with automatic subpopulation identification:

1
2
3
4
5
6
7
from spark_bestfit import GaussianMixtureFitter

fitter = GaussianMixtureFitter(spark)
results = fitter.fit(df, column="value", n_components=[2, 3, 4])

best = results.best(n=1, metric="bic")[0]
# Returns component means, variances, and weights

Multivariate Normal

MLE estimation for correlated variables:

1
2
3
4
5
6
7
from spark_bestfit import MultivariateFitter

fitter = MultivariateFitter(spark)
results = fitter.fit(df, columns=["revenue", "cost", "headcount"])

mvn = results.multivariate_normal
# mvn.mean, mvn.cov, mvn.sample(n)

Adaptive Sampling

Automatic sampling strategy selection based on skewness, kurtosis, and tail behavior. Chooses between uniform, stratified, or importance sampling for improved fit quality on heavy-tailed distributions.

1
results = fitter.fit(df, column="value", adaptive_sampling=True)

Example Notebooks

NotebookUse Case
A/B TestingConversion rate distributions
Insurance ClaimsHeavy-tailed loss modeling
Monte CarloDistribution inputs for financial models
Sensor DataIoT time-series fitting
Customer LTVSurvival analysis for CLV
Demand ForecastingMulti-modal demand patterns

Performance Impact

The difference is dramatic. On large datasets (100M+ rows):

Traditional approach: You’re either collecting hundreds of megabytes to the driver (OOM risk) or broadcasting that same data to executors repeatedly (slow).

Histogram approach: Driver holds ~1KB of histogram data. Broadcast happens once. The memory footprint drops by roughly 1000x compared to collecting raw data.

The v2.0 features add more optimization opportunities:

  • lazy_metrics=True skips expensive K-S/A-D computation until needed
  • prefilter=True eliminates incompatible distributions early
  • Distribution-aware partitioning spreads slow distributions across executors

When To Use This

If you have millions of rows and need to test many distributions, this makes sense. If your data fits in memory on a single machine, you can still use spark-bestfit with the LocalBackend—no Spark required.

The library is on PyPI and GitHub. Full documentation at spark-bestfit.readthedocs.io.

Migration from v0.x

If you’re upgrading from v0.3.x, here’s the API migration:

1
2
3
4
5
6
7
8
9
# v0.3.x (deprecated functional API)
from spark_bestfit import fit_distributions
results = fit_distributions(df, column="value", num_bins=100, metric="ks")

# v2.0.0 (class-based API)
from spark_bestfit import DistributionFitter
fitter = DistributionFitter(spark)
results = fitter.fit(df, column="value", num_bins=100)
best = results.best(n=1, metric="ks_statistic")[0]

Key changes:

  • fit_distributions()DistributionFitter(spark).fit()
  • Results are now a FitResults container with .best(), .filter(), .quality_report() methods
  • Each result is a DistributionFitResult with .pdf(), .cdf(), .sample(), .save() methods
  • Q-Q plots moved from spark_bestfit.plotting.qq_plot() to fitter.plot()

See the Migration Guide for complete details.

The Core Idea

Move computation to data, not data to computation. Histograms are a compact summary that enable distributed fitting without shuffling gigabytes around your cluster.

The v2.0 architecture takes this further by abstracting the execution backend. Whether you’re on a Spark cluster, Ray, or your laptop, the same histogram-based approach minimizes data movement.

Nothing revolutionary here—just using distributed primitives correctly.