Applying Sequential Monte-Carlo to time series forecasting

Lately, I’ve been working on an implementation of Sequential Monte-Carlo (SMC) a.k.a. particle filters, for a probabilistic programming library I’m developing. This family of methods is useful for performing filtering for a wide class of dynamical systems. Filtering corresponds to estimating the current state of a dynamical system given a noisy view of its trajectory until now. An important class of models on which SMC can be used is that of Hidden Markov models. In those, we observe through some possibly noisy function (whence the hidden) the sequence of states of a Markov chain (so the dynamic is itself probabilistic). Kalman filters fall under the same umbrella but focus on continuous state spaces.

In order to better test the library and convince myself that it is useful for something non-trivial, I’ve been looking for an interesting problem to solve using SMC. I’ve settled on forecasting the power output of a wind turbine using the dataset made available here. You can also have a look at the user-provided solutions on the same link for other (perhaps better – I didn’t compare) approaches.

This dataset contains a time series of:

as well as some other variables that we will disregard. The recording interval spans over two years and the sampling period is equal to \(10\) minutes. It is known that the wind turbine has a peak output power of \(1750\) kW.

The goal is to construct a model able to predict, in an on-line fashion, the total power production of the wind turbine over the next 15 days using all data gathered up to now. Note that forecasting is not the same task as filtering: the former consists in estimating future states while the latter, as explained above, seeks to estimate the current state. what we’ll do is construct a dynamical model of power production, solve it on the dataset using filtering and then evaluate the dynamical model forward on the filtered estimate to perform forecasting.

This article follows linearly through the full implementation: take this as a literate programming experiment rather than a blog post, feel free to skip boring code. The full source is available on github.

The plan is as follows.

Helpers

Depending on the needs, we will represent time series either as values of type (float * 'a) Seq.t or (int * 'a) Seq.t (the first component of the sequence denoting time) or use dense representations. We define some convenient functions for manipulating these series.

let sf = Printf.sprintf

(* Naming typical durations *)
let minute = 60.

let hour = minute *. 60.

let day = hour *. 24.

let year = day *. 365.

let month = year /. 12.

(* Compute the (approximate) time derivative of the series *)
let derivative series =
  Seq.map2
    (fun (t, x) (t', x') -> (t, (x' -. x) /. (t' -. t)))
    series
    (Seq.tail_exn series)

(* Restrict a signal to a given time interval *)
let restrict low hi =
  assert (low < hi) ;
  Seq.filter (fun (t, _) -> t > low && t <= hi)

(* If [seq = (t1, x1), (t2, x2), (t3, x3) ...] then
   [cumulative seq] is equal to
   [(x1, t1), (t2, x1 + x2), (t3, x1 + x2 + x3), ...]. *)
let cumulative ts =
  Seq.unfold
    (fun (prev, s) ->
      match s () with
      | Seq.Nil -> None
      | Seq.Cons ((t, x), s') ->
          let tot = prev +. x in
          Some ((t, tot), (tot, s')))
    (0.0, ts)

(* [fold_unfold f state seq] folds [f] over [seq] and constructs
   lazily a sequence out of each outcome. *)
let rec fold_unfold f state seq =
  match seq () with
  | Seq.Nil -> Seq.nil
  | Seq.Cons (elt, tail) ->
      let (state, elt) = f state elt in
      fun () -> Seq.Cons (elt, fold_unfold f state tail)

(* Average a [signal] piecewise according to [kernel]. *)
let average (signal : Float.Array.t) (kernel : Float.Array.t) =
  let m = Float.Array.length signal in
  let n = Float.Array.length kernel in
  if n > m then
    invalid_arg "average: dimensions mismatch (kernel too big)" ;
  (* some samples might be lost but we don't care *)
  let s = m / n in
  Float.Array.init s (fun i ->
      let acc = ref 0.0 in
      let start = i * n in
      for j = 0 to n - 1 do
        acc :=
          !acc
          +. (Float.Array.get signal (start + j)
              *. Float.Array.get kernel j)
      done ;
      !acc)

(* Map the first and second components of a sequence of pairs *)
let map_fst f seq = Seq.map (fun (x, y) -> (f x, y)) seq

let map_snd f seq = Seq.map (fun (x, y) -> (x, f y)) seq

Loading the data

The data is given as a CSV file, we use the csv library to load it and project out the relevant fields: power and wind speed. Note that the dataset misses a lot of rows: for now we fill those with zeroes but that issue will have to be dealt with later.

(* Loading the CSV file *)
let rows =
  Containers.IO.with_in Sys.argv.(1) @@ fun ic ->
  let csv = Csv.of_channel ic in
  (* Correct the header of the CSV *)
  let header = Csv.next csv in
  Csv.Rows.set_header ~replace:true csv ("Date" :: List.tl header) ;
  Seq.of_list @@ Csv.Rows.input_all csv

(* We select output power and wind speed. Units are not explicited
   in the dataset: we posit that wind speed is given in m/s and power
   in kilowatts. *)
let proj_attribute row attribute =
  match float_of_string (Csv.Row.find row attribute) with
  | x -> x
  | exception _ -> 0.0

let tseries =
  let absolute_time =
    Seq.fmap
      (fun row ->
        match Ptime.of_rfc3339 @@ Csv.Row.find row "Date" with
        | Error _ -> None
        | Ok (date, _, _) ->
            let seconds = Ptime.to_float_s date in
            let power = proj_attribute row "ActivePower" in
            let wind_speed = proj_attribute row "WindSpeed" in
            Some (seconds, (power, wind_speed)))
      rows
  in
  let (t0, _) = Seq.head_exn absolute_time in
  (* Shift to t0 *)
  Seq.map (fun (t, data) -> (t -. t0, data)) absolute_time

(* The data is supposed to be sampled at 10mn intervals;
   we check that it is the case *)
let () =
  assert (
    Seq.for_all2
      (fun (t, _) (t', _) -> t' -. t = 10. *. minute)
      tseries
      (Seq.tail_exn tseries))

let sampling_period = 10. *. minute

A first look at the data

A bird’s eye view

In order to design our model, we need to understand the process that generated the data. We plot the power and wind speed over the whole second month of recording. We plot as well the total cumulative power over the whole dataset. Here we use the prbnmcn-gnuplot library to produce the plots.

let power = Seq.map (fun (t, (power, _ws)) -> (t, power)) tseries

let wind_speed = Seq.map (fun (t, (_power, ws)) -> (t, ws)) tseries

let () =
  let open Plot in
  let to_days seconds = seconds /. day in
  let plot ~xaxis ~yaxis ~title ~legend data =
    plot2
      ~xaxis
      ~yaxis
      ~title
      [ Line.line_2d
          ~points:
            (data
            |> Seq.map (fun (s, p) -> Plot.r2 (to_days s) p)
            |> Plot.Data.of_seq)
          ~legend
          () ]
  in
  let wind_speed_plot =
    plot
      ~xaxis:"time (days)"
      ~yaxis:"wind speed (m/s)"
      ~title:"wind speed"
      ~legend:"wind speed (m/s)"
      (wind_speed |> restrict (1. *. month) (2. *. month))
  in
  let power_plot =
    plot
      ~xaxis:"time (days)"
      ~yaxis:"power (kW)"
      ~title:"power"
      ~legend:"power (kW)"
      (power |> restrict (1. *. month) (2. *. month))
  in
  let cumulative_power_plot =
    plot
      ~xaxis:"time (days)"
      ~yaxis:"power (kW)"
      ~title:"cumulative power (full range)"
      ~legend:"cumulative power (kW)"
      (cumulative power)
  in
  Plot.(
    run_matrix
      ~plots:
        [| [| Some wind_speed_plot |];
           [| Some power_plot |];
           [| Some cumulative_power_plot |]
        |]
      ~target:(png
                 ~pixel_size:(1024, 512 * 3)
                 ~png_file:"plots/raw_data.png"
                 ())
      exec)

[fig:raw_data]

Raw data

Unsurprisingly, wind speed and power seem highly correlated. One observes that the wind speed series seems to be structured as a sequence of peaks with a daily periodicity. One also notes some long periods where wind speed is equal to 0: this correspond to missing rows.

More interestingly, the cumulative power looks piecewise affine with small fluctuations around its mean behaviour. The plot also displays a clear yearly seasonal inflexion.

Seasonality

Let’s look at the power and wind speed distributions over distinct ’seasons’, where ’season’ is understood as a maximal contiguous interval of data that looks affine (ie in what follows I performed the split manually).

(* in days *)
let season_A = ("season_A", (1, 180))

let season_B = ("season_B", (180, 250))

let season_C = ("season_C", (250, 520))

let season_D = ("season_D", (520, 620))

let season_E = ("season_E", (620, 820))

let seasons = [season_A; season_B; season_C; season_D; season_E]

let hist name series =
  let points =
    series |> Seq.map snd |> Seq.map Plot.r1 |> Plot.Data.of_seq in
  Plot.Histogram.hist ~points ~bins:50 ~legend:name ()

let season_hist name series =
  let open Plot in
  let plot (season_name, (a, b)) =
    let series = series |> restrict (float a *. day) (float b *. day) in
    hist season_name series
  in
  plot2
    ~xaxis:name
    ~yaxis:"freq"
    ~title:(sf "seasonal plot (%s)" name)
    (List.map plot seasons)
  |> run
       ~target:(png ~png_file:(sf "plots/%s_seasons.png" name) ())
       exec

let () =
  let filtered = Seq.filter (fun (_, p) -> p > 0.0) wind_speed in
  season_hist "wind-speed" filtered

let () =
  let filtered = Seq.filter (fun (_, p) -> p > 0.0) power in
  season_hist "power" filtered

The following plot shows the histograms of wind speed over all seasons. Note that the seasons do not have the same amount of samples and that these histograms are not normalized. We observe that season_B and season_D have a higher mean wind speed and have a larger support.

Wind speed histogram, per season

The relationship between wind speed and power output is nonlinear, as will be made explicit in the next section. This manifests very concretely: power output distribution is very skewed towards high values for season_B and season_D.

Power histogram, per season

To sum up: power distribution is unimodal on each season. Cumulative power looks piecewise affine, which indicates that power (the first derivative of cumulative power) averaged over sufficiently big windows is close to constant on each season.

An aside: power output as a function of wind speed

Let us take a small break off our objective and look at the relationship between wind speed and power output.

We will produce a model allowing to predict the instantaneous power production as a function of wind speed at any given time. We’ll not use this model in our forecasting model, but it is an interesting application of probabilistic programming to a simple non-linear regression (with a pretty tame nonlinearity). In practice, we could use this to reduce the original problem to that of forecasting wind speed over a window of time in the future given past observations. Then, one could use dedicated meteorological models to predict wind speed.

We know that the power output is capped to around 1750 kW.

(* Unit: kW *)
let max_power = 1750.

It is known that the power output of a wind turbine is proportional to the cube of the wind speed). (This page contains a nice explanation.) predicted_power is our model, with a being the parameter we’d like to fit to the data. (Note that this function is nonlinear because of the min).

let predicted_power a ws = Float.min max_power (a *. (ws ** 3.))

We prepare an array containing the power and wind speed on a small window. The actual subset of samples we take to perform the inference should not matter, as long as

(* - restrict the timeseries to a reasonable sub-interval
     (big enough for accurate statistics, not too big to not
     impair inference speed)
   - filter out elements where output power is negative *)
let power_and_wind_speed =
  tseries
  (* restrict to window of interest *)
  |> restrict (6. *. month) (6.5 *. month)
  (* project time out *)
  |> Seq.map (fun (_, data) -> data)
  (* filter out samples with p <= 0 *)
  |> Seq.filter (fun (p, _) -> p > 0.0)
  (* convert to array for faster iteration *)
  |> Array.of_seq

Let’s proceed to define a model to fit the scaling parameter a in predicted_power. The idea behind the model is pretty simple. We’d like to guess a parameter a that is such that for all pairs (p, ws) in power_and_wind_speed, predicted_power a ws is close to p.

We make this formal by scoring each prediction with a gaussian density centered on p: if the predicted value is spot on for a given pair (p, ws), it will correspond to the maximum of the gaussian probability density function. If it is slightly off, the score will be slightly lower. How fast score decreases is specified by the standard deviation of the scoring gaussian, that we set heuristically to max_power/10. Each pair is scored in this way. The total score associated to a is the product of all scores over all elements of power_and_wind_speed (this quantity is computed implicitly by the inference algorithm).

We’ll use Lightweight Metropolis-Hastings (LMH) inference to fit the model.

open Dagger.Lmh_inference

let model =
  let open Infix in
  (* We set a reasonably large uniform prior on [a] *)
  let* a = sample @@ Stats_dist.float 3.0 in
  let+ () =
    Array_ops.iter
      (fun (p, ws) ->
        let predicted = predicted_power a ws in
        (* We score the prediction using a gaussian pdf:
           the score will be maximal at the mean.
           The bigger the [std], the more lax we are. *)
        score @@
          Stats.Pdfs.gaussian ~mean:p ~std:(max_power /. 10.) predicted)
      power_and_wind_speed
  in
  a

Let’s proceed to inference. We drop the first \(10000\) samples for burn-in and take the \(10000\) subsequent ones to form the posterior on a.

let rng_state = Dagger.RNG.make_self_init ()

let samples = stream_samples model rng_state

let samples =
  samples
  |> Seq.drop 10_000 (* burn-in *)
  |> Seq.take 10_000
  |> List.of_seq

Let’s plot the posterior:

let () =
  let open Plot in
  run
    ~target:(png ~png_file:"plots/scaling_posterior.png" ())
    exec
    (plot2
       ~xaxis:"a"
       ~yaxis:"freq"
       ~title:"Posterior on scaling parameter"
       [ Histogram.hist
           ~points:(Data.of_list (List.map r1 samples))
           ~bins:50
           () ])
Posterior on model parameter

The posterior is snuggly contained in the \([2.46;2.56]\) interval. For the purposes of this example, it is enough to take the mean of the posterior as our estimate of the scaling parameter.

let inferred_scaling =
  (* take the empirical mean *)
  List.fold_left ( +. ) 0.0 samples /. 10000.

let () = Format.printf "P(t) = %f x ws(t)^3@." inferred_scaling

The last line above displays something like: "P(t) = 2.513402 x ws(t)^3". Let us define our estimator for the power output as a function of wind speed:

let convert ws = predicted_power inferred_scaling ws

Let’s compare actual vs predicted power output on some interval.

let () =
  let series =
    tseries |> restrict (4. *. month) ((4. *. month) +. (3. *. day))
  in
  let actual = Seq.map (fun (t, (power, _)) -> Plot.r2 t power) series in
  let predicted =
    Seq.map
      (fun (t, (_, wind_speed)) -> Plot.r2 t (convert wind_speed))
      series
  in
  let open Plot in
  run
    ~target:(png ~png_file:"plots/wind_to_power.png" ())
    exec
    (plot2
       ~xaxis:"t"
       ~yaxis:"p"
       ~title:"Predicted vs actual power output"
       [ Line.line_2d ~points:(Data.of_seq actual) ~legend:"actual" ();
         Line.line_2d ~points:(Data.of_seq predicted) ~legend:"predicted" ()
       ])
Predicted vs actual power

Not too bad! We observe a nonlinearity that we didn’t take into account: when wind speed goes under a certain threshold, the wind turbine just doesn’t produce any power. This is normally part of the specification of the turbine, which we don’t have access to. In any case, we won’t need nor use any of that for our forecasting model.

Preparing the data

An first issue with the dataset is its many missing rows. We need to fill those out. Another issue is that the sampling period of \(10\) minutes is too fine-grained for our purposes. In particular, it features a daily periodic behaviour that is not relevant to our objective. Aggregating data over a longer period has two advantages: it acts as a low-pass filter (removing useless data) and allows to cut on computational resource consumption during filtering.

Let’s implement a solution for the first issue. When loading the CSV, we filled those missing rows with zeroes. We replace consecutive zeroes with the value of a sliding average over a few days worth of data prior to the first zero.

(* This module implements sequence transformers implemented
   by folding over the input sequence a function taking
   a sliding window as argument. *)

module Windowed_fold = struct
  module Queue = CCFQueue

  type ('x, 'a) state = { samples : 'x Queue.t; acc : 'a }

  let make_transducer f state next_sample =
    let (produced, acc) = f state.samples next_sample state.acc in
    let (samples, _old_sample) = Queue.take_back_exn state.samples in
    let samples = Queue.cons next_sample samples in
    ({ samples; acc }, produced)

  let apply f initial seq =
    let transducer = make_transducer f in
    fold_unfold transducer initial seq

  (* When encoutering a sequence of zeroes, replace by value of
     sliding average. *)
  let fill_zeroes_with_average window_duration seq =
    let nsamples = int_of_float (window_duration /. (10. *. minute)) in
    let initial =
      { samples = Queue.of_list (List.init nsamples (fun _ -> 0.0));
        acc = `Keep
      }
    in
    apply
      (fun samples next_sample acc ->
        match (next_sample, acc) with
        | (0.0, `Fill avg) -> (avg, acc)
        | (_, `Fill _) -> (next_sample, `Keep)
        | (0.0, `Keep) ->
            let tot = Queue.fold ( +. ) 0.0 samples in
            let avg = tot /. float_of_int (Queue.size samples) in
            (avg, `Fill avg)
        | (x, `Keep) -> (x, `Keep))
      initial
      seq
end

Let’s solve the second issue and construct a dataset ready for filtering. We aggregate data on consecutive windows of some fixed duration. Let us call the successive indices of these windows ticks.

We fix the duration of one tick to be one day. We restrict our attention to a subinterval of 300 days for legibility of the plots.

module Data = struct
  let start = 1.

  let stop = 300.

  let tick_duration = day

  let ticks_per_day = day /. tick_duration

  (* Restrict the time series to the interval of interest *)
  let power = restrict (day *. start) (day *. stop) power

  (* Coarsen the signal on a per-day basis *)
  let coarsened_power =
    let tick_samples =
      int_of_float (tick_duration /. sampling_period) in
    (* Fill the missing gaps using a 5-day sliding window
       estimation of the average *)
    let gaps_filled =
      power |> Seq.map snd
      |> Windowed_fold.fill_zeroes_with_average (5. *. day)
      |> Float.Array.of_seq
    in
    (* All samples in the averaged interval have equal weight *)
    let window = Float.Array.make tick_samples 1.0 in
    average gaps_filled window

  let tick_to_day i = float_of_int i *. tick_duration /. day

  let monotonic = Seq.unfold (fun i -> Some (i, i + 1)) 1

  (* Coarsened power timeseries with time labelled in ticks *)
  let power_ticks =
    Seq.zip monotonic (Float.Array.to_seq coarsened_power)

  (* Coarsened power timeseries with time labelled in days *)
  let power_days =
    Seq.zip
      (Seq.map tick_to_day monotonic)
      (Float.Array.to_seq coarsened_power)
end

Forecasting power output

Let us now move to our forecasting model. As stated in the introduction, we decompose forecasting as a filtering phase postcomposed by an extrapolation step based on the state estimate. We use an ad-hoc Kalman filter to estimate the state of the process.

It’s worth building an intuitive idea of how filtering works. The algorithm maintains a set of weighted family of states. A weighted state is called a particle. The weight of a particle corresponds to its conformance to the model. This family is sometimes called a population. At each time step, the population is pushed forward through the dynamical model (which is supposed known, even though it can feature uncertainty of the probabilistic kind). The resulting population is conditioned against the external observations: this is implemented by scoring, i.e. reweighting the particles according to how well they predict the observed data. From that conditioned population, a new population is drawn with a prescribed cardinality (this is the resampling step, which the library does for the user).

We first define a dynamical model and then carry on to defining the forecasting model.

A dynamical model

As written above, filtering relies on having a dynamical model of the process. We model power output as a point following Newton’s laws and subject to a random force. The state of the system at time \(t\) is described by three variables:

The dynamics are implemented by discretizing the law of motions using a simple forward Euler step. It is also convenient to make the vector space structure of the state space explicit (we use it when computing averages over a weighted collection of states).

module State : sig
  (* The type of states *)
  type t

  (* [make] creates a state. [power, power', power''] are
     the estimates for respectively power, its first and
     second derivatives.  *)
  val make : power:float -> power':float -> power'':float -> t

  val power : t -> float

  val power' : t -> float

  val power'' : t -> float

  val zero : t

  (* Evaluating dynamics forward *)

  val forward : t -> span:int -> float

  val forward_cumulative : t -> span:int -> float
end = struct
  type t = Float.Array.t

  let (power, power', power'', dim) = (0, 1, 2, 3)

  let make ~power ~power' ~power'' =
    Float.Array.map_from_array Fun.id [| power; power'; power'' |]

  let power a = Float.Array.get a power

  let power' a = Float.Array.get a power'

  let power'' a = Float.Array.get a power''

  let zero = Float.Array.make dim 0.0

  let forward state_t0 ~span =
    let pacc = ref (power state_t0) in
    let pacc' = ref (power' state_t0) in
    for _i = 1 to span do
      pacc := !pacc +. !pacc' ;
      pacc' := !pacc' +. power'' state_t0
    done ;
    !pacc

  let forward_cumulative (state_t0 : t) ~span =
    let pacc = ref (power state_t0) in
    let pacc' = ref (power' state_t0) in
    let cacc = ref 0.0 in
    for _i = 1 to span do
      pacc := !pacc +. !pacc' ;
      pacc' := !pacc' +. power'' state_t0 ;
      cacc := !cacc +. !pacc
    done ;
    !cacc
end

The model

We perform forecasting over a window of 15 days in the future. Recall that our end goal is the total (ie cumulative) power output over the next 15 days.

(* The maximal power that can be produced in one day. *)
let max_power_per_tick =
  let samples = Data.tick_duration /. sampling_period in
  samples *. max_power

let forecast_days = 15.

let forecast_span = int_of_float (forecast_days *. Data.ticks_per_day)

open Dagger.Smc_inference

The model takes as arguments:

Each particle evolves its own view of the state. The role of record_particle is to save the population at each tick: each particle that survives resampling saves its state by calling it. (We could alternatively let each particle carry its trace and gather all traces at the end of the filtering process, but this would not be suitable for on-line forecasting. To be honest, I’m not totally satisfied by my current approach but using callbacks is typical of concurrent programming hackery, which this is – suggestions of more structured options are welcome).

In the case where the sequence of data is nonempty, the model proceeds in two consecutive steps:

This second phase is the most interesting. First off, we compute the predicted state at the end of the forecast span. We then check that both the predicted power for the current tick and the forecast power are consistent with the physical constraints imposed by the wind turbine. The score_noyield primitive allows to reweight the particle. (The noyield suffix indicates that the function does not schedule the particle to be subject to resampling, while the score primitive does).

We perform two more subsequent conditioning steps. The following line constrains the predicted power to be not too far off the observed one. The smaller the std, the tighter we force the process to follow the real trajectory. But if we constrain it to much, the trajectory might become incompatible with the constraints imposed by the dynamical model! So we need some slack.

score_noyield
@@ Stats.Pdfs.gaussian ~mean:observed_power ~std:15_000. power

The last conditioning has a similar role but for the forecast estimate: we expect that the power cannot drift off too far, too fast during the forecast span.

score_noyield (Stats.Pdfs.gaussian ~mean:power ~std:120_000. forecast)

In both last lines, we tuned the standard deviations by hand, which is bad style. We could imagine letting the inference algorithm discovering the correct stds by itself, by spawning an initial population with random standard deviations and letting successive resampling select out good ones.

The call to yield schedules the particle for resampling. This acts as a synchronization barrier for all particles (this is due to the structure of the model where the control flow of each particle always goes through this call). After resampling, we record the population using record_particle.

Finally, we update the particle state by applying a random force to it.

The full code of the model follows:

let rec model (state : State.t) record_particle data =
  let open Infix in
  match data () with
  | Seq.Nil -> return ()
  | Seq.Cons ((index, observed_power), rest) ->
      (*
        predict using forward Euler step
       *)
      let power' = State.power' state +. State.power'' state in
      let power = State.power state +. State.power' state in
      (*
        observe
       *)
      let forecast = State.forward state ~span:forecast_span in
      let in_bounds x =
        if x <= max_power_per_tick && x >= 0.0 then 1.0 else 0.0
      in
      let* () =
        (* reject particles with OOB forecast *)
        score_noyield (in_bounds forecast)
      in
      let* () =
        (* reject particles with OOB power *)
        score_noyield (in_bounds power)
      in
      let* () =
        (* condition on the fact that observed power is polluted by
           gaussian noise with given std *)
        score_noyield
        @@ Stats.Pdfs.gaussian ~mean:observed_power ~std:15_000. power
      in
      let* () =
        (* condition on the fact that forecast can't be too far off the
           current power estmate - the [std] here should be proportional
           to [forecast_span], here we guesstimated it empirically *)
        score_noyield (Stats.Pdfs.gaussian ~mean:power ~std:120_000. forecast)
      in
      let* () = yield in
      (* Record the resampled population. Note that all particles surviving
         a resampling step have equal weight! *)
      record_particle ~state ~at_tick:index ;
      let* power'' = sample @@ Stats_dist.gaussian ~mean:0.0 ~std:500. in
      let state = State.make ~power ~power' ~power'' in
      model state record_particle rest

Results

Let us run inference. We use 1000 particles: using might reduce variance but it is not really needed for this exposition. Note that it is possible to dynamically fork more particles during the inference; this was not needed either. The process takes less than 5 seconds on my laptop. We get filtered, a sequence of population of states at each tick.

let filtered =
  let nparticles = 1000 in
  let rng_state = Dagger.RNG.make_self_init () in
  let table = Hashtbl.create 300 in
  let record_particle ~state ~at_tick =
    Hashtbl.add table at_tick state in
  (* There's also an interruptible runner that can be used for on-line
     filtering *)
  let _pop =
    Non_interruptible.run
      nparticles
      (model State.zero record_particle Data.power_ticks)
      rng_state
  in
  Seq.map (fun (i, _) -> (i, Hashtbl.find_all table i)) Data.power_ticks

We need to extract some statistics out of this sequence of populations, typically the mean and quantiles. (We use prbnmcn-stats to compute those statistics.)

(* Compute the qth quantile out of a population. *)
let quantile q pop =
  let open Stats.Fin.Float in
  let mes = counts_of_empirical Stats.float_table pop in
  Stats.Fin.Float.quantile (module Basic_structures.Std.Float) mes q

(* Compute the mean out of a population. *)
let mean pop =
  let open Stats.Fin.Float in
  let mes =
    counts_of_empirical Stats.float_table pop |> normalize |> as_measure
  in
  mean mes

let filtered statistics =
  Seq.map
    (fun (t, pop) ->
      match pop with
      | [] -> (t, State.zero)
      | _ ->
          let pop = Array.of_list pop in
          let power = Array.map State.power pop |> statistics in
          let power' = Array.map State.power' pop |> statistics in
          let power'' = Array.map State.power'' pop |> statistics in
          (t, State.make ~power ~power' ~power''))
    filtered

Let’s compute and plot the outcome of filtering as well as the forecasted trajectory. In black, we plot the data. In green, we plot the mean filtered trajectory as well as the 0.25 and 0.75th percentiles (being able to quantify uncertainty is one of the chief advantages of bayesian approaches!). In blue, we plot the forecast trajectory and its quantiles. (Feel free to jump over the boring stats and plotting code and access directly Fig. 2.)

let filtered_mean = filtered mean

let filtered_median = filtered median

let filtered_quantile q = filtered (quantile q)

let forecast_by_integration s =
  Seq.map
    (fun (tick, state) ->
      let forecast_tick = tick + forecast_span in
      let t = Data.tick_to_day forecast_tick in
      let forecast_power = State.forward state ~span:forecast_span in
      (t, forecast_power))
    s

let black = Plot.Style.(default |> set_color Plot.Color.black)

let green = Plot.Style.(default |> set_color Plot.Color.green)

let blue = Plot.Style.(default |> set_color Plot.Color.blue)

let green_squares =
  Plot.Style.(
    default
    |> set_color Plot.Color.(rgb 0.0 0.8 0.0)
    |> set_point ~ptyp:Plot.Pointtype.square ~psize:0.4)

let blue_squares =
  Plot.Style.(
    default
    |> set_color Plot.Color.(rgb 0.0 0.0 0.8)
    |> set_point ~ptyp:Plot.Pointtype.square ~psize:0.4)

let plots =
  [ ("data", Data.power_days, black);
    ("filt-mean", filtered_mean |> map_snd State.power |> to_days, green);
    ( "filt-0.25",
      filtered_quantile 0.25 |> map_snd State.power |> to_days,
      green_squares );
    ( "filt-0.75",
      filtered_quantile 0.75 |> map_snd State.power |> to_days,
      green_squares );
    ("forecast-int", forecast_by_integration filtered_mean, blue);
    ( "forecast-int-0.25",
      filtered_quantile 0.25 |> forecast_by_integration,
      blue_squares );
    ( "forecast-int-0.75",
      filtered_quantile 0.75 |> forecast_by_integration,
      blue_squares ) ]

let () =
  let plot (legend, seq, style) =
    Plot.Line.line_2d
      ~points:(Plot.Data.of_seq (seq |> Seq.map Plot.tup_r2))
      ~legend
      ~style
      ()
  in
  let prepare_plots timeseries =
    let xtics =
      let open Plot.Tics in
      default
      |> set_position ~start:0.0 ~incr:5.0
      |> set_rotate ~degrees:(-90.)
    in
    Plot.plot2 ~xaxis:"t" ~yaxis:"kW" ~xtics (List.map plot timeseries)
  in
  let plot = prepare_plots plots in
  Plot.(
    run
      ~target:(png ~pixel_size:(1920, 1080) ~png_file:"plots/forecast.png" ())
      exec
      plot)

[fig:result]

Filtering and forecast result

Finally, let’s look at the cumulative power plot. We spare the tired reader the plotting code.

[fig:result-cumulative]

Filtering and forecast result, cumulative

Clearly, it’s not perfect ;). It appears the approach has trouble dealing with the seasonal shifts. Since these were not modelled at all, it’s not entirely surprising. We could indeed integrate manually those in the model, sacrificing a bit of generality for more accuracy.

A proper statistical study should carry on and validate the model on some other part of the dataset and perform a rigorous study of its predictive power; we won’t.

Conclusion

I had some fun working on this dataset. I tried out many models before finding something that looked reasonable. I won’t vouch for the quality of the model itself compared to the heavy artillery of LSTM-based models on Kaggle but I’m very satisfied of one fact: it was pretty easy to come up with modifications and iterate fast on the model. Inference performance is not bad, and I think there’s large room for improvement using better data structures for representing particles and especially using OCaml multicore’s fibers instead of my Cps monad-based implementation. In fact it should not be hard to parallelize the inference using a bulk-synchronous model.