1. Functional Regression
Let the covariate
be an at least twice continuously differentiable random function defined wlog. on an interval
and
the corresponding the response. For simplicity we assume centered random variables, i.e.
beside we require bounded second moments
,
. We assume that there exists some
that satisfies
(1) ![]()
We define cross-covariance functions of X and Y given by
(2) ![]()
Let
be the covariance operator of
as defined in this post with ordered eigenvalues
and corresponding eigenfunctions
, then
statisfies (1) if and only if
(3) ![]()
(3) has a solution if and only if
is invertible, for
the unique solution is then given by
(4) 
However this solution is unstable as to be seen by a simple example. Consider some
, and define
. Then as ![]()
(5) 
This means that small pertubations in
leads to a large variation (even infinite) in
. There are various ways to tackle this problem, the key idea is to trade the uniqness for the stabilty of the a solution. This is archieved by reducing the dimensionality of the covariance operator. By doing so we do not solve the same problem but another one that is somewhat close to the original problem.
A popular approach is to smooth the regression or to truncate the expansion after some
. This truncation approach will be the foundation of our spark implementation.
1.1 Simulation

For our simulation we will again assume a brownian motion
on
and
. For the simulation we sample
curves and approximate
as proposed in this post with
at
timepoints
, accordingly
.
1.2 Implementation in Spark
Our implementation is based on the FPCA Algorithm presented in this post. We use the estimated first
eigenfunctions
and corresponding eigenvalues
to construct an estimator based on 5 for
for
with
![Rendered by QuickLaTeX.com \[\hat{\beta}(t_k)= \frac{1}{T} \sum_{j=1}^T \sum_{i=1}^L \hat{\lambda}_i^{-1} \hat{\gamma}_i(t_k) \hat{\gamma}_i(s_j) g(s_j).\]](https://www.thebigdatablog.com/wp-content/ql-cache/quicklatex.com-e7c97f649744cd93931e928fadf6475e_l3.png)
#We start with a sample of brownian motions
import matplotlib.pyplot as plt
from numpy import *
from pyspark.mllib.linalg import *
from pyspark.mllib.linalg.distributed import *
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
def spark_FPCA( matrix , L):
N = matrix.numRows()
T = matrix.numCols()
scale =CoordinateMatrix(sc.parallelize(range(N)).map(lambda i: MatrixEntry(i, i, 1./sqrt(T) )), N, N).toBlockMatrix()
# Compute the top L singular values and corresponding singular vectors.
svd = scale.multiply(matrix).transpose().toIndexedRowMatrix().computeSVD(L)
s = svd.s # The singular values are stored in a local dense vector.
Va=svd.V.toArray()
Vl = list();
for i in range(0, len(Va)):
Vl.append( IndexedRow(i, Va[i] ))
V=IndexedRowMatrix( sc.parallelize(Vl) )
S=DenseMatrix( L,L, diag(s).flatten().flatten().tolist() )
Si=DenseMatrix( L,L, diag(1./(s)).flatten().flatten().tolist() )
scores=V.multiply(S)
components= matrix.transpose().multiply( V.multiply(Si).toBlockMatrix() )
#reduced FPCA decomposition
FPCA= components.multiply(scores.toBlockMatrix().transpose() )
return (components, scores, FPCA, s)
def spark_FReg( Xmatrix ,Ymatrix , L):
fpca=spark_FPCA( Xmatrix , L)
T = matrix.numCols()
components=fpca[0]
s=fpca[3]*fpca[3]
#reconstruct beta
Si=DenseMatrix( L,L, diag(1./(s*T)).flatten().flatten().tolist() )
beta_est=Ymatrix.transpose().multiply( Xmatrix ).multiply(components).toIndexedRowMatrix().multiply(Si).toBlockMatrix().multiply(components.transpose())
return (beta_est)
##EXAMPLE
T=5000
N=800
#define beta(t), the score function
time=2*pi*true_divide(arange(0,T),T )
beta= sin( time)
#betamat=DenseMatrix( T,N, np.repeat(beta,N) )
betamat=DenseMatrix( T,1, beta )
scaleMat=DenseMatrix( T,T, diag(1/T).flatten().flatten().tolist() )
#plt.plot(time,beta)
#plt.show()
data = list();
for i in range(0, N):
data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) )
#convert the data to spark
sc_data= sc.parallelize( data )
matrix = IndexedRowMatrix(sc_data).toBlockMatrix().cache()
scale=CoordinateMatrix(sc.range(N).map(lambda i: MatrixEntry(i, i, 1./T)), N, N).toBlockMatrix()
Ymatrix=scale.multiply(IndexedRowMatrix(sc_data).multiply(betamat).toBlockMatrix()).cache()
##Plot the Input data
#plt.plot( Ymatrix.toLocalMatrix().toArray().flatten() )
#plt.show()
#reconstruct beta
beta_est=spark_FReg(matrix, Ymatrix, 12).toLocalMatrix()
times=true_divide( arange(0,T),T )
plt.plot(times, beta_est.toArray().flatten())
plt.plot(times,beta )
plt.show()