Functional Principal Component Analysis with Spark

    1.) Functional Principal Component Analysis

    Let X be a centered smooth random function in L^2([0,1]), with finite second moment \int_{[0,1]} \EE\left[X(u)^2 \right]du < \infty. Without loss of generality we assume [0,1] instead of some arbitrary compact interval and only consider centered functions. If non-centered curves are assume than the curves can be centered by subtracting E(X).

    The underlying dependence structure can be characterized by the covariance function \sigma(t,v)\stackrel{\operatorname{def}}{=}E\left[X(t)X(v)\right] and
    the corresponding covariance operator \Gamma

    (1)   \begin{equation*} (\Gamma \vartheta)(t)=\int_{[0,1]}\sigma(t,v)\vartheta(v)dv. \end{equation*}

    Let \lambda_1\geq \lambda_2\geq \dots denote the ordered eigenvalues of \sigma and let \gamma_1,\gamma_2,\dots be a corresponding system of orthonormal
    eigenfunctions (functional principal components) s.t. \sigma(t,v)= \sum_{r=1}^\infty \lambda_r \gamma_r(t) \gamma_r(v). The Karhunen-Loeve decomposition states that the random functions X can be represented in the form

    (2)   \begin{equation*}  X(t)=\sum_{r=1}^{\infty} \delta_{r} \gamma_r(t) \qquad \end{equation*}

    where the loadings \delta_{r} are random variables defined as \delta_{r}\stackrel{\operatorname{def}}{=} \int_{[0,1]} X(t) \gamma_r(t) du that satisfy E \left(\delta_{r}^2\right)=\lambda_r, as well as E \left(\delta_{r}\delta_{s}\right)=0 for r\neq s. \gamma_r(t) are denoted as functional principal components and \delta_{r} the corresponding principal scores.

    1.1) Example: Brownian motion

    A nice analytical example where we can actually calculate the principal components is the brownian motion. Let X(t)=W_t be a brownian motion on t \in [0,1], then its probability density function is given by f_{W_t} (x)= \frac{1}{\sqrt{2 \pi t} }e^{-x^2/(2t)}. Accordingly E(W_t)=0 and Var(W_t)= E(W_t^2)-E^2(W_t)=E(W_t^2)=t. The covariance function with t<s  is then given by

        \[\sigma(t,s)= E( W_t W_s)=E(W_t ( (W_s -W_t)  )  +E(W_t^2) =  E(W_t- W_0) E (W_s -W_t  )  +E(W_t^2)=t\]

    since for 0 \leq s_1<t_1\leq t_2<s_2, W_{t_1}-W_{s_1} and W_{s_2}-W_{t_2}  are independent random variables. Thus  \sigma(t,s)= min(t,s) and we have to solve the following eigenvalue problem

        \[ \int_{[0,1]}min(t, s) \vartheta(s)_r ds = \lambda_r \vartheta_r (t)\]

    wich can be rewritten as

        \[ \int_{[0,t]}min(t, s) s \vartheta_r(s) ds + t \int_{[t,1]}min(t, s) \vartheta_r(s) ds = \lambda_r \vartheta_r (t)\]

    . Differentiating once leads to \int_{[t,1]}min(t, s) \vartheta_r(s) ds = \lambda_r \vartheta_r'(s) this gives us two boundary conditions given by \vartheta_r(0)=0 and \vartheta'_r(1)=0. Differentiating again leads to - \vartheta_r(t) =\lambda_r \vartheta''_r(t). In fact this is the standart example from basic math curses introducing Sturm-Liouville and a solution is given by

        \[\lambda_r=  \left(\frac{1}{(r-\frac{1}{2} )\pi} \right)^2 , \vartheta_r(t)=\sqrt{2} sin\left((r-\frac{1}{2} )\pi t \right).\]

    Using that E \left(\delta_{r}^2\right)= \left(\frac{1}{(r-\frac{1}{2} )\pi} \right)^2 with Z_r \sim N(0,1) the Karhunen-Loeve decomposition is given by

        \[W_t=\sqrt{2} \sum_{r=1}^\infty \frac{Z_r}{(r-\frac{1}{2} )\pi}   sin\left((r-\frac{1}{2} )\pi t \right).\]

    2.) A Spark implementation

    Usually a sample of N curves is observed, in addition Spark is not able to handle functions but is only able to process discrete data. To model these issues, let X_1,\dots, X_N \in L^2([0,1]) be an i.i.d. sample of smooth curves with continuous covariance function, we assume that each curve in the sample is observed at an equidistant grid (t_1, \dots, t_T). We will thus use empirical approximation of the covariance function is then given by the sample covariance matrix

        \[\hat{\sigma}(t_j,t_k) = \frac{1}{N} \sum_{i=1}^N X_i(t_j) X_i(t_k).\]

    to derive an estimator for the functional components one will usually rely on an eigenvalue decomposition of \{\hat{\sigma}(t_j,t_k)\}_{i,j=1,\dots,T}. For the Spark implementation, let us focus now on an alternative way to estimate the Karhunen-Loeve decomposition based on the duality relation between row and column space. Let M be the dual matrix consisting of entries

    (3)   \begin{equation*} M_{ij}= \sum_{k=1}^T X_i(t_k) X_j(t_k). \end{equation*}

    The eigenvectors p_r=(p_{1r}, \ldots, p_{Nr}) and eigenvalues l_r of the matrix M are connected to the empirical \hat{\gamma}_r, \hat{\lambda}_r and \hat{ \delta }_{r}, resulting in replacing the expectation in\sigma(t,v)\stackrel{\operatorname{def}}{=}E\left[X(t)X(v)\right] with the empirical counterpart and the integral in (1) by a rieman sum, by \hat{\gamma}_r(t)= \frac{1}{ \sqrt{l_r} } \sum_{i=1}^N p_{ir} X_i(t), \hat{\lambda}_r=\frac{1}{N} l_r  and \hat{ \delta }_{r}=\sqrt{l_r}p_{ir}

    2.1) Test the Algorithm using the Brownian Motion

    The figure shows the first two true principal components of a brownian motion and the estimates from the presented algorithm

    Let Z_m \sim N(0,1), we simulate a standard Brownian Motion at points (0, \frac{1}{T}, \frac{2}{T},\dots,\frac{T-1}{T} ,1) with m \leq T by

        \[X_T \left( \frac{m}{T} \right) = \frac{1}{ \sqrt{T} } \sum_{1<k \leq  m } Z_k.\]

    Using Donsker’s theorem one can then show that for T \rightarrow \infty,  X_T converges in distribution to a standard brownian motion W. To test the algorithm we sample N=400 curves X_{T,i} at T=10000 equidistant timepoints.

    2.2) Implementation

    As an primary example and to check if our algorithm works the way we intended, we will sample a set of N curves from the space of brownian motions. These curves where computed at the master and then be transfered to the spark cluster where the Spark-FPCA is performed. The functional principal components  are then transferred back to the master where we compare our estimates with the true functional principal components  derived in the previous sections. 

    #We start with a sample of brownian motions
    import matplotlib.pyplot as plt
    from numpy import *
    from pyspark.mllib.linalg import *
    from pyspark.mllib.linalg.distributed import *
    from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
    
    def spark_FPCA( matrix , L):
        N = matrix.numRows()  
        T = matrix.numCols()  
        scale =CoordinateMatrix(sc.parallelize(range(N)).map(lambda i: MatrixEntry(i, i, 1./sqrt(T) )), N, N).toBlockMatrix()
        # Compute the top L singular values and corresponding singular vectors.
        svd = scale.multiply(matrix).transpose().toIndexedRowMatrix().computeSVD(L)
        s = svd.s       # The singular values are stored in a local dense vector.
        Va=svd.V.toArray()
        Vl = list();
        for i in range(0, len(Va)):
            Vl.append( IndexedRow(i, Va[i] ))
        V=IndexedRowMatrix( sc.parallelize(Vl) )
        S=DenseMatrix( L,L, diag(s).flatten().flatten().tolist() )
        Si=DenseMatrix( L,L, diag(1./(s)).flatten().flatten().tolist() )
        scores=V.multiply(S)
        components= matrix.transpose().multiply( V.multiply(Si).toBlockMatrix() )
        #reduced FPCA decomposition
        FPCA= components.multiply(scores.toBlockMatrix().transpose() )
        return (components, scores, FPCA, s)
    
    def reuse_FPCA( matrix , components):
        N = matrix.numRows()  
        T = matrix.numCols()  
        scaler=CoordinateMatrix(sc.range(N).map(lambda i: MatrixEntry(i, i, 1./T)), N, N).toBlockMatrix()
        scores=scaler.multiply(matrix).multiply( components )
        FPCA= components.multiply(scores.transpose() )
        return (components, scores.toIndexedRowMatrix(), FPCA)
    
    
    
    
    ##EXAMPLE USING A BROWNIAN MOTION
    
    T=10000
    N=400
    L=2
    
    data = list();
    for i in range(0, N):
        data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) )
    
    #convert the data to spark
    sc_data= sc.parallelize( data )
    matrix = IndexedRowMatrix(sc_data).toBlockMatrix().cache()
    
    result=spark_FPCA(matrix, 3)
    
    FPCA=result[2].toLocalMatrix()
    comp=result[0].toLocalMatrix()
    scores_old=result[1].rows.collect()
    
    
    #Component plots
    times=true_divide( arange(0,T),T )
    true_comp=list();
    for i in range(1, L+1):
        true_comp.append( sin( (i-0.5)*pi*times) )
    
    for i in range(0,len(true_comp)):
        plt.plot(times, true_comp[i])
    
    for i in range(1,len(comp.toArray()[1,:])+1):
        plt.plot(times, 0.7*comp.toArray()[:,(i-1)]) ##I have no idea where the scaling diff comes from... :(
    
    plt.show()
    
    
    ##Construct new Brownian motion to reuse principal components
    data = list();
    for i in range(0, N):
        data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) )
    
    plt.plot(times, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) 
    plt.show()
    
    #convert the data to spark
    sc_data= sc.parallelize( data )
    matrix2 = IndexedRowMatrix(sc_data).toBlockMatrix().cache()
    result2=reuse_FPCA(matrix2, result[0])
    
    ##New Scores
    result2[1].rows.collect()
    

2 thoughts on “Functional Principal Component Analysis with Spark

  1. Pingback: Functional Regression with Spark – The Big Data Blog

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.