Functional Regression with Spark

1. Functional Regression

Let the covariate X= (X(t), t \in [0,1]) be an at least twice continuously differentiable random function defined wlog. on an interval [0,1] and Y \in \mathbb{R} the corresponding the response. For simplicity we assume centered random variables, i.e. E(X)=0, E(Y)=0 beside we require bounded second moments E(Y^2)< \infty, \int_0^1 E(X(t)^2)< \infty. We assume that there exists some \beta \in L^2[0,1] that satisfies

(1)   \begin{align*} Y= \int_0^1 \beta(t) X( t ) dt. \end{align*}

We define cross-covariance functions of X and Y given by

(2)   \begin{align*} g(t)= E ( Y X )(t). \end{align*}

Let \Gamma be the covariance operator of X as defined in this post with ordered eigenvalues \lambda_1, \lambda_2, \dots and corresponding eigenfunctions \gamma_1(t),\gamma_2(t), \dots, then \beta statisfies (1) if and only if

(3)   \begin{align*} (\Gamma \beta)(t) = g(t). \end{align*}

(3) has a solution if and only if \Gamma is invertible, for \lambda_i>0 \; \forall i the unique solution is then given by

(4)   \begin{align*} \beta(t)= \int_0^1 \sum_{i=1}^\infty \lambda_i^{-1} \gamma_i(t) \gamma_i(s) g(s) ds. \end{align*}

However this solution is unstable as to be seen by a simple example. Consider some \epsilon >0, and define g^{\epsilon}(t):= g(t) + \epsilon \gamma_i(t). Then as i \rightarrow \infty

(5)   \begin{align*} ||\int_0^1 \sum_{i=1}^\infty \lambda_i^{-1} g(s) \gamma_i(s) \gamma_i(t) ds - \int_0^1 \sum_{i=1}^\infty \lambda_i^{-1} g^{\epsilon}(s) \gamma_i(s) \gamma_i(t) ds || = \lambda_i^{-1} \epsilon \rightarrow \infty. \end{align*}

This means that small pertubations in g leads to a large variation (even infinite) in \beta. There are various ways to tackle this problem, the key idea is to trade the uniqness for the stabilty of the a solution. This is archieved by reducing the dimensionality of the covariance operator. By doing so we do not solve the same problem but another one that is somewhat close to the original problem.
A popular approach is to smooth the regression or to truncate the expansion after some L. This truncation approach will be the foundation of our spark implementation.

1.1 Simulation

The figure shows the true regression function beta(t) and the estimate from the algorithm.

For our simulation we will again assume a brownian motion X(t)=W_t on [0,1] and \beta(t)= sin(t). For the simulation we sample N=800 curves and approximate W_{t,i} as proposed in this post with X_{T,i} at T=5000 timepoints (t_1,\dots,t_T), accordingly Y_{i,T}=\sum_{j=1}^T sin(t_j) X_{T,i}(t_j).

1.2 Implementation in Spark

Our implementation is based on the FPCA Algorithm presented in this post. We use the estimated first L eigenfunctions \hat{\gamma}_j and corresponding eigenvalues \hat{\lambda}_j \; j=1,\dots,L to construct an estimator based on 5 for \beta(t) for k=1,\dots,T with

    \[\hat{\beta}(t_k)= \frac{1}{T} \sum_{j=1}^T \sum_{i=1}^L \hat{\lambda}_i^{-1} \hat{\gamma}_i(t_k) \hat{\gamma}_i(s_j) g(s_j).\]

#We start with a sample of brownian motions
import matplotlib.pyplot as plt
from numpy import *
from pyspark.mllib.linalg import *
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

def spark_FPCA( matrix , L):
    N = matrix.numRows()
    T = matrix.numCols()
    scale =CoordinateMatrix(sc.parallelize(range(N)).map(lambda i: MatrixEntry(i, i, 1./sqrt(T) )), N, N).toBlockMatrix()
    # Compute the top L singular values and corresponding singular vectors.
    svd = scale.multiply(matrix).transpose().toIndexedRowMatrix().computeSVD(L)
    s = svd.s       # The singular values are stored in a local dense vector.
    Va=svd.V.toArray()
    Vl = list();
    for i in range(0, len(Va)):
        Vl.append( IndexedRow(i, Va[i] ))
    V=IndexedRowMatrix( sc.parallelize(Vl) )
    S=DenseMatrix( L,L, diag(s).flatten().flatten().tolist() )
    Si=DenseMatrix( L,L, diag(1./(s)).flatten().flatten().tolist() )
    scores=V.multiply(S)
    components= matrix.transpose().multiply( V.multiply(Si).toBlockMatrix() )
    #reduced FPCA decomposition
    FPCA= components.multiply(scores.toBlockMatrix().transpose() )
    return (components, scores, FPCA, s)

def spark_FReg( Xmatrix ,Ymatrix , L):
    fpca=spark_FPCA( Xmatrix , L)
    T = matrix.numCols()
    components=fpca[0]
    s=fpca[3]*fpca[3]
    #reconstruct beta
    Si=DenseMatrix( L,L, diag(1./(s*T)).flatten().flatten().tolist() )
    beta_est=Ymatrix.transpose().multiply( Xmatrix ).multiply(components).toIndexedRowMatrix().multiply(Si).toBlockMatrix().multiply(components.transpose())
    return (beta_est)

##EXAMPLE
T=5000
N=800

#define beta(t), the score function
time=2*pi*true_divide(arange(0,T),T )
beta= sin( time)
#betamat=DenseMatrix( T,N, np.repeat(beta,N) )
betamat=DenseMatrix( T,1, beta )
scaleMat=DenseMatrix( T,T, diag(1/T).flatten().flatten().tolist() )

#plt.plot(time,beta)
#plt.show()

data = list();
for i in range(0, N):
	data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) )

#convert the data to spark
sc_data= sc.parallelize( data )
matrix = IndexedRowMatrix(sc_data).toBlockMatrix().cache()

scale=CoordinateMatrix(sc.range(N).map(lambda i: MatrixEntry(i, i, 1./T)), N, N).toBlockMatrix()
Ymatrix=scale.multiply(IndexedRowMatrix(sc_data).multiply(betamat).toBlockMatrix()).cache()

##Plot the Input data
#plt.plot( Ymatrix.toLocalMatrix().toArray().flatten() )
#plt.show()

#reconstruct beta
beta_est=spark_FReg(matrix, Ymatrix, 12).toLocalMatrix()

times=true_divide( arange(0,T),T )
plt.plot(times, beta_est.toArray().flatten())

plt.plot(times,beta )
plt.show()

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.