Time series (learning) with Apache Spark

// --------------------------------------------------------------------------------------------------------------------
// <copyright file="MultivariateTimeSeries.scala" company="Bayes Server">
//   Copyright (C) Bayes Server.  All rights reserved.
// </copyright>
// --------------------------------------------------------------------------------------------------------------------

package com.bayesserver.spark.examples.parameterlearning

import org.apache.spark.{SparkContext, SparkConf}
import com.bayesserver._
import com.bayesserver.learning.parameters.{ParameterLearningOptions, ParameterLearning}
import com.bayesserver.inference.Evidence
import com.bayesserver.spark.core.{MemoryNameValues, BroadcastNameValues, IteratorEvidenceReader, BayesSparkDistributer}

/**
 * Example that learns the parameters of a time series model.
 */
object MultivariateTimeSeries {

  /**
   * @param sc The SparkContext  (see Apache Spark user guide for help on this)
   */
  def apply(sc: SparkContext, licenseKey: Option[String] = None) = {

    licenseKey.foreach(key => License.validate(key))

    // hard code some test data.  Normally you would read data from your cluster.
    val data = createRDD(sc).cache()

    // A network could be loaded from a file or stream
    // we create it manually here to keep the example self contained
    val network = createNetwork

    val parameterLearningOptions = new ParameterLearningOptions

    // Bayes Server supports multi-threaded learning
    // which we want to turn off as Spark takes care of this
    parameterLearningOptions.setMaximumConcurrency(1)

    /// parameterLearningOptions.setMaximumIterations(...) // this can be useful to limit the number of iterations

    val driverToWorker = new BroadcastNameValues(sc)

    val output = ParameterLearning.learnDistributed(network, parameterLearningOptions,
      new BayesSparkDistributer[Seq[(Double, Double)]](
        data,
        driverToWorker,
        (ctx, iterator) => new TimeSeriesEvidenceReader(ctx.getNetwork, iterator),
        licenseKey
      ))

    // we could now call network.save(...) to a file or stream
    // and the file could be opened in the Bayes Server User Interface

    println("Time series parameter learning complete")
    println("Case count = " + output.getCaseCount)
    println("Log-likelihood = " + output.getLogLikelihood)
    println("Converged = " + output.getConverged)
    println("Iterations = " + output.getIterationCount)

  }

  /**
   * Some test data.  Normally you would load the data from the cluster.
   *
   * We have hard coded it here to keep the example self contained.
   * @return An RDD
   */
  def createRDD(sc: SparkContext) = {

    sc.parallelize(Seq(
      Seq((1.0, 2.3), (2.3, 4.5), (6.2, 7.2), (4.2, 6.6)),
      Seq((3.3, -1.2), (3.2, 4.4), (-3.3, -2.3), (4.15, 1.2), (8.8, 2.2), (4.1, 9.9)),
      Seq((1.0, 2.0), (3.3, 4.1)),
      Seq((5.0, 21.3), (4.3, 6.6), (-2.1, 4.5)),
      Seq((4.35, -3.25), (13.44, 12.4), (-1.3, 3.33), (4.2, 2.15), (12.8, 4.25)),
      Seq((1.46, 2.22), (1.37, 3.15), (2.2, 2.25))
    ))
  }

  /**
   * Create a network in code.  An existing network could also be read from file or stream using Network.load.
   * @return A Bayes Server network.
   */
  def createNetwork = {

    val network = new Network

    val series = new Node()
    series.setName("Series")
    series.setTemporalType(TemporalType.TEMPORAL)

    val x = new Variable("X", VariableValueType.CONTINUOUS)
    series.getVariables.add(x)
    val y = new Variable("Y", VariableValueType.CONTINUOUS)
    series.getVariables.add(y)
    network.getNodes.add(series)

    network.getLinks.add(new Link(series, series, 1))
    network.getLinks.add(new Link(series, series, 2))
    network.getLinks.add(new Link(series, series, 3))

    network
  }

  /**
   * Implements the Bayes Server EvidenceReader interface, for reading our data.
   * @param network The network
   * @param iterator The iterator, which will be generated by RDD.mapPartitions.
   */
  class TimeSeriesEvidenceReader(val network: Network, val iterator: Iterator[Seq[(Double, Double)]])
    extends IteratorEvidenceReader[Seq[(Double, Double)]] {

    val x = network.getVariables.get("X")
    val y = network.getVariables.get("Y")

    override def setEvidence(item: Seq[(Double, Double)], evidence: Evidence): Unit = {

      for (time <- 0 until item.length) {

        evidence.set(x, item(time)._1, time)
        evidence.set(y, item(time)._2, time)
      }
    }
  }


}