
Nutzung fortgeschrittener Vorhersagetechniken mit StatsForecast: Eine Fallstudie
- Von Torben Windler
Share post:
EINLEITUNG
Die Herausforderung
Problemstellung
Unser Kunde stellte uns einen Datensatz zur Verfügung, der sich über mehrere Jahre erstreckte und vergangene Ereignisse für verschiedene Kunden und Bauteile enthielt. Die Ereignisse traten selten auf, sodass der Datensatz sehr unregelmäßig war. Unser Ziel war es, vorauszusagen, wann diese Ereignisse in der Zukunft auftreten werden, um eine bessere Ressourcenzuweisung und -planung zu ermöglichen. Obwohl die Zukunft mathematisch unabhängig von der Vergangenheit ist, konnten wir mit den Vorhersagen aus diesem Datensatz eine vernünftige Vorhersage erzeugen.Beschreibung der Daten
Beispiel:
Auf der rechten Seite sieht man ein Beispiel dafür, wie die Daten aussehen, wenn sie verarbeitet und für die Prognose vorbereitet sind.
‚ds‘ ist ein wöchentlicher Zeitstempel (wir wollen wöchentliche Prognosen), ‚unique_id‘ steht für die Gruppe (verkettet aus Bauteil und Kunde) und ‚y‘ bezeichnet die Anzahl der Ereignisse in dieser bestimmten Woche.
| ds | unique_id | y |
| --- | --- | --- |
| 2023-07-24 | part_a, customer_a | 1 |
| 2022-02-28 | part_b, customer_b | 2 |
| 2024-04-22 | part_b, customer_c | 1 |
| 2024-03-18 | part_b, customer_d | 1 |
| 2024-03-25 | part_b, customer_d | 0 |
| … | … | … |
| 2017-04-24 | part_y, customer_a | 0 |
| 2017-05-01 | part_y, customer_a | 1 |
| 2017-05-08 | part_y, customer_a | 0 |
| 2017-05-15 | part_y, customer_a | 1 |
| 2021-04-26 | part_b, customer_z | 2 |
Lösungsansatz
Um diese Herausforderung zu bewältigen, habe ich mehrere Ansätze untersucht, bevor ich mich für das StatsForecast-Paket entschied. Im Folgenden sind einige der von mir untersuchten Techniken aufgeführt:
Bei dieser Technik werden Zeitreihendaten in Trend-, Saison- und Restkomponenten zerlegt. Sie war zwar nützlich für das Verständnis von Mustern, konnte aber nicht gut mit Unterbrechungen umgehen.
Das optimierte Modell von Croston ist eine fortschrittliche Prognosemethode für unregelmäßige Nachfragedaten, die exponentielle Glättung zur Erfassung von Trends und saisonalen Schwankungen mit separaten Schätzungen für das Auftreten und die Größe von Nicht-Null-Nachfrage kombiniert. Dieser Ansatz hilft, Über- und Unterprognosen auszugleichen und liefert genauere Vorhersagen für sporadische Nachfragemuster.
Der Intermittent Multiple Aggregation Prediction Algorithm (IMAPA) ist ein Algorithmus, der zukünftige Werte unregelmäßiger Zeitreihen vorhersagt, indem er die Zeitreihenwerte in regelmäßigen Abständen aggregiert und dann ein beliebiges Prognosemodell, wie z. B. eine optimierte einfache exponentielle Glättung (SES), verwendet, um diese aggregierten Werte vorherzusagen. IMAPA ist robust gegenüber fehlenden Daten, rechnerisch effizient und einfach zu implementieren, so dass es für verschiedene Aufgaben im Bereich der sporadischen Zeitreihenprognose geeignet ist.
Das ADIDA-Modell verwendet Simple Exponential Smoothing (SES) auf zeitlich aggregierten Daten, um die unregelmäßige Nachfrage zu prognostizieren, wobei die Nachfrage in Bereiche mit einer mittleren Intervallgröße aggregiert wird. Die Vorhersagen werden dann auf die ursprünglichen Zeiträume disaggregiert.
Das TSB-Modell ist eine fortschrittliche Methode, die in der Bestandsverwaltung und der Nachfrageprognose für Produkte mit intermittierender Nachfrage eingesetzt wird und als Erweiterung des Modells von Croston vorgeschlagen wurde. Es aktualisiert die Nachfragewahrscheinlichkeit in jeder Periode auch dann, wenn keine Nachfrage auftritt, wodurch es sich besser für das Management des Veralterungsrisikos bei Daten mit vielen Nullen eignet.
DAS STATSFORECAST PAKET
import polars as pl
from statsforecast import StatsForecast
from statsforecast.models import ADIDA, CrostonClassic, CrostonOptimized, CrostonSBA, IMAPA, TSB
# Create the models (can be extended with hyperparameter tuning)
models = [
ADIDA(),
CrostonClassic(),
CrostonOptimized(),
CrostonSBA(),
IMAPA(),
TSB(alpha_d=0.2, alpha_p=0.2)
]
# Store the names of the models to use them later in polars colum selection
model_names = [m.alias for m in models]
# We want weekly forecasts
sf = StatsForecast(models=models, freq='1w', n_jobs=-1, verbose=True)
sf.fit(train)
FORECAST_HORIZON = 52 # weeks
forecasts_df = sf.predict(h=FORECAST_HORIZON)
# join the forecasts with the target values ('y') from the test data for evaluation
forecasts_df = (
forecasts_df
.join(test, on=["unique_id", "ds"], how="left")
.fill_null(0)
)
Dieser kurze Codeschnipsel war die Grundlage dafür, dass ich schnell mehrere Modelle für den Datensatz erstellen und ausführen konnte. Tatsächlich haben wir anschließend noch die Hyperparameter jedes Modells mit bewährten Methoden angepasst.
Handhabung gleichmäßiger Verteilungen
Die von diesen Modellen erstellten Prognosen waren ursprünglich diskrete Gleichverteilungen über den Prognosehorizont. Um die Anforderung zu erfüllen, bestimmte geschätzte Zeitpunkte für vorhergesagte Ereignisse zu haben, habe ich folgenden Ansatz gewählt:Kumulierte Wahrscheinlichkeiten Modulo 1
Um dieses Problem zu lösen, habe ich alle Wahrscheinlichkeiten über die Zeit kumuliert und den kumulativen Wert modulo 1 berechnet. So konnte ich die ursprünglichen Wahrscheinlichkeitswerte mit diesen neuen Werten vergleichen. Wenn die ursprüngliche Wahrscheinlichkeit größer oder gleich dem Modulo-Wert ist, deutet dies auf ein Ereignis zu diesem Zeitpunkt hin.Implementierung-Schritte
1. Wahrscheinlichkeiten kumulieren: Addition der Wahrscheinlichkeiten für jeden Prognosezeitraum.Codebeispiel
forecasts_df = (
forecasts_df
# Calculate the value with modulo 1, to obtain some kind of 'virtual' probability
.with_columns(pl.col(model_names).cum_sum().mod(1).over("unique_id").name.suffix("_cum"))
# If the modulo values are smaller than the original, this indicates
# an event with the demand from the original column rounded to the next integer.
# Otherwise we do not have an event and set the value to None.
.with_columns([
pl.when(pl.col(m) >= pl.col(f"{m}_cum"))
.then(pl.col(m).ceil())
.otherwise(None)
for m in model_names]
)
# Here we do not need the cumulated values anymore
.drop(pl.col("^*_cum$"))
)
Gültigkeit des Ansatzes
Dieser Ansatz wandelt eine Gleichverteilung effektiv in diskrete Ereignisprognosen auf der Grundlage kumulativer Wahrscheinlichkeiten um. Durch den Vergleich jeder prognostizierten Wahrscheinlichkeit mit dem entsprechenden kumulativen Wert modulo 1 können wir bestimmte Zeitpunkte identifizieren, an denen Ereignisse wahrscheinlich eintreten werden.
Bewertung der Leistung
Kumulierter Vorhersagefehler (CFE)
Implementierung
def cfe(df: pl.DataFrame, model_names: list[str]) -> pl.DataFrame:
"""Calculate the Cumulative Forecast Error (CFE) for multiple models in a Polars DataFrame.
This function calculates the Cumulative Forecast Error (CFE) for each forecast model in a DataFrame. CFE is defined as the cumulative sum of the difference between actual values (`y`) and forecast values (``). The result includes the minimum, maximum, and last CFE value for each unique identifier.
The function takes a Polars DataFrame with the following structure:
- `unique_id`: A unique identifier for each data point
- `y`: The actual target value
- `model_names` (list of strings): Column names representing different model predictions
Parameters:
df (pl.DataFrame): Input DataFrame containing the data. It must include a column named `y` representing the actual values, and columns named `` for each model in the `model_names` list.
model_names (list[str]): A list of strings representing the names of the forecast models.
Returns:
pl.DataFrame: Output DataFrame with CFE results. It contains columns for `unique_id`, `model`, and statistics such as `cfe_min` (minimum CFE), `cfe_max` (maximum CFE), and `cfe_last` (last CFE) for each unique identifier and model.
"""
df = (
df
# Calculate Cumulative Forecast Error for each model per unique_id
.with_columns((pl.col(model_names) - pl.col("y")).cum_sum().over("unique_id"))
# Unpivot the DataFrame to have a single column for CFE values and corresponding model names
.unpivot(model_names, index="unique_id", variable_name="model", value_name="cfe")
# Group by unique_id and model, then aggregate to get min, max, and last CFE value
.group_by("unique_id", "model")
.agg(
pl.min("cfe").alias("cfe_min"),
pl.max("cfe").alias("cfe_max"),
pl.last("cfe").abs().alias("cfe_last")
)
)
return df
cfe_df = forecasts_df.pipe(cfe, model_names=model_names)
Stock-keeping -oriented Prediction Error Costs(SPEC)
Die Stock-keeping-oriented Prediction Error Costs (SPEC) misst die Vorhersagegenauigkeit durch den Vergleich von tatsächlichen Ereignissen und Vorhersage in Form von virtuell entstandenen Kosten über den Vorhersagehorizont. Wenn die Prognosen Ereignisse vor dem Eintreten des Zielereignisses vorhersagen, entstehen Bestandserhaltungskosten. Im umgekehrten Fall erhalten wir Opportunitätskosten. Die Beziehung zwischen beiden Fehlern wird mit $\alpha \in [0, 1]$ gewichtet. Höhere Alphas gewichten die Opportunitätskosten stärker, während niedrigere Alphas den Lagerhaltungskosten mehr Gewicht verleihen. In diesem Szenario sind beide von Bedeutung, daher setzen wir $\alpha = 0,5$.
Darüber hinaus haben wir die Implementierung der Autoren leicht verändert und eine eigene Metrik verwendet, die viel schneller arbeitet und uns nicht nur die Summe der Lagerhaltungskosten und der Opportunitätskosten, sondern auch die beiden Einzelkosten liefert:
Implementierung
def spec(df: pl.DataFrame, model_names: list[str], alpha: float = 0.5):
"""Stock-keeping-oriented Prediction Error Costs (SPEC)
Read more in the :ref:`https://arxiv.org/abs/2004.10537`.
The function takes a Polars DataFrame with the following structure:
- `unique_id`: A unique identifier for each data point
- `y`: The actual target value
- `model_names` (list of strings): Column names representing different model predictions
Parameters:
df (pl.DataFrame): Input DataFrame containing the data. It must include a column named `y` representing the actual values, and columns named `` for each model in the `model_names` list.
model_names (list[str]): A list of strings representing the names of the forecast models.
alpha (float): Provides the weight of the opportunity costs. The weight of stock-keeping costs is taken as (1 - alpha), hence alpha should be in the interval [0, 1].
Returns
-------
pl.DataFrame: Output DataFrame with SPEC results. It contains columns for `unique_id`, `model`, and the SPEC error for each unique identifier and model. The SPEC error is also provided as opportunity costs, stock-keeping costs, and the sum of both.
"""
df = (
df
.sort("unique_id", "ds")
# Calculate cum sum for every prediction and the target
.with_columns(pl.col(["y"] + model_names).cum_sum().over("unique_id").name.suffix("_cum_sum"))
# Compute differences in both directions from the predictions cumsum and the target cumsum
.with_columns(
*[(-pl.col(f"{m}_cum_sum") + pl.col("y_cum_sum")).alias(f"{m}_diff_y_f") for m in model_names],
*[(pl.col(f"{m}_cum_sum") - pl.col("y_cum_sum")).alias(f"{m}_diff_f_y") for m in model_names]
)
# Multiply the first difference with alpha (opportunity costs)
# and the second difference with (1 - alpha) (stock-keeping costs)
.with_columns(
*[(pl.col(f"{m}_diff_y_f") * alpha).clip(lower_bound=0).alias(f"{m}_o") for m in model_names],
*[(pl.col(f"{m}_diff_f_y") * (1 - alpha)).clip(lower_bound=0).alias(f"{m}_s") for m in model_names]
)
# Add the opportunity and stock-keeping costs
.with_columns([(pl.sum_horizontal(f"{m}_o", f"{m}_s")).alias(f"{m}_total") for m in model_names])
# Group by unique_id and model, then aggregate to get average SPEC values
.group_by("unique_id")
.agg(*[pl.col(f"{m}_total", f"{m}_o", f"{m}_s").mean() for m in model_names])
# Unpivot the DataFrame to have separate columns for SPEC values and corresponding model names
.unpivot([x for xs in [[f"{m}_total", f"{m}_o", f"{m}_s"] for m in model_names] for x in xs], index="unique_id", variable_name="model", value_name="spec")
# extract the different spec value indicators (total, o, s) to a separate column
.with_columns(pl.col("model").str.split("_").list.to_struct(fields=["model", "error"]))
.unnest("model")
# pivot by the different splits (total, o, s) in the error column
.pivot("error", index=["unique_id", "model"], values="spec")
# rename accordingly
.rename({"total": "spec_total", "o": "spec_o", "s": "spec_s"})
)
return df
spec_df = forecasts_df.pipe(spec, model_names=model_names)
CFE und SPEC zusammen
- Trendanalyse: CFE hilft bei der Identifizierung von Trends in den Fehlern im Laufe der Zeit, so dass Sie erkennen können, ob das Modell konstant zu niedrige oder zu hohe Prognosen abgibt.
- Magnitude und Richtung: Die Minimal- und Maximalwerte von CFE können Aufschluss über die Gesamtleistung und die Richtung der Fehler geben.
- Zeitbasierte Abweichung: SPEC berechnet Vorhersagefehler auf der Grundlage ihrer Auswirkungen auf künftige Lagerbestände und bestraft Fehler, die zu hohen Lagerhaltungskosten oder Fehlbeständen führen könnten.
- Gewichtete Kosten: Es ermutigt Unternehmen, sich auf die Minimierung von Über- und Unterbeständen zu konzentrieren, was zu einer verbesserten betrieblichen Effizienz führt.
FAZIT
ANMERKUNG
Wie ihr sehen könnt, verwende ich Polars anstelle von Pandas, Numpy, etc. für alle Berechnungen. Bei m2hycon verwenden wir Polars seit Anfang 2023, wo wir das Potenzial und die Auswirkungen gegenüber anderen Bibliotheken wie Pandas, Dask, Ray und sogar Apache Spark gesehen haben. Ein Blog-Beitrag darüber, wie wir den Wechsel vollzogen haben und wie er sich auf unsere Produktivität und Ergebnisse auswirkt, folgt in Kürze!

Torben Windler
Lead AI Engineer