1. Functional Regression
Let the covariate be an at least twice continuously differentiable random function defined wlog. on an interval and the corresponding the response. For simplicity we assume centered random variables, i.e. beside we require bounded second moments , . We assume that there exists some that satisfies
(1)
We define cross-covariance functions of X and Y given by
(2)
Let be the covariance operator of as defined in this post with ordered eigenvalues and corresponding eigenfunctions , then statisfies (1) if and only if
(3)
(3) has a solution if and only if is invertible, for the unique solution is then given by
(4)
However this solution is unstable as to be seen by a simple example. Consider some , and define . Then as
(5)
This means that small pertubations in leads to a large variation (even infinite) in . There are various ways to tackle this problem, the key idea is to trade the uniqness for the stabilty of the a solution. This is archieved by reducing the dimensionality of the covariance operator. By doing so we do not solve the same problem but another one that is somewhat close to the original problem.
A popular approach is to smooth the regression or to truncate the expansion after some . This truncation approach will be the foundation of our spark implementation.
1.1 Simulation
For our simulation we will again assume a brownian motion on and . For the simulation we sample curves and approximate as proposed in this post with at timepoints , accordingly .
1.2 Implementation in Spark
Our implementation is based on the FPCA Algorithm presented in this post. We use the estimated first eigenfunctions and corresponding eigenvalues to construct an estimator based on 5 for for with
#We start with a sample of brownian motions import matplotlib.pyplot as plt from numpy import * from pyspark.mllib.linalg import * from pyspark.mllib.linalg.distributed import * from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry def spark_FPCA( matrix , L): N = matrix.numRows() T = matrix.numCols() scale =CoordinateMatrix(sc.parallelize(range(N)).map(lambda i: MatrixEntry(i, i, 1./sqrt(T) )), N, N).toBlockMatrix() # Compute the top L singular values and corresponding singular vectors. svd = scale.multiply(matrix).transpose().toIndexedRowMatrix().computeSVD(L) s = svd.s # The singular values are stored in a local dense vector. Va=svd.V.toArray() Vl = list(); for i in range(0, len(Va)): Vl.append( IndexedRow(i, Va[i] )) V=IndexedRowMatrix( sc.parallelize(Vl) ) S=DenseMatrix( L,L, diag(s).flatten().flatten().tolist() ) Si=DenseMatrix( L,L, diag(1./(s)).flatten().flatten().tolist() ) scores=V.multiply(S) components= matrix.transpose().multiply( V.multiply(Si).toBlockMatrix() ) #reduced FPCA decomposition FPCA= components.multiply(scores.toBlockMatrix().transpose() ) return (components, scores, FPCA, s) def spark_FReg( Xmatrix ,Ymatrix , L): fpca=spark_FPCA( Xmatrix , L) T = matrix.numCols() components=fpca[0] s=fpca[3]*fpca[3] #reconstruct beta Si=DenseMatrix( L,L, diag(1./(s*T)).flatten().flatten().tolist() ) beta_est=Ymatrix.transpose().multiply( Xmatrix ).multiply(components).toIndexedRowMatrix().multiply(Si).toBlockMatrix().multiply(components.transpose()) return (beta_est) ##EXAMPLE T=5000 N=800 #define beta(t), the score function time=2*pi*true_divide(arange(0,T),T ) beta= sin( time) #betamat=DenseMatrix( T,N, np.repeat(beta,N) ) betamat=DenseMatrix( T,1, beta ) scaleMat=DenseMatrix( T,T, diag(1/T).flatten().flatten().tolist() ) #plt.plot(time,beta) #plt.show() data = list(); for i in range(0, N): data.append( IndexedRow(i, array( cumsum(random.normal(0,sqrt(true_divide(1,T)),T)) ) ) ) #convert the data to spark sc_data= sc.parallelize( data ) matrix = IndexedRowMatrix(sc_data).toBlockMatrix().cache() scale=CoordinateMatrix(sc.range(N).map(lambda i: MatrixEntry(i, i, 1./T)), N, N).toBlockMatrix() Ymatrix=scale.multiply(IndexedRowMatrix(sc_data).multiply(betamat).toBlockMatrix()).cache() ##Plot the Input data #plt.plot( Ymatrix.toLocalMatrix().toArray().flatten() ) #plt.show() #reconstruct beta beta_est=spark_FReg(matrix, Ymatrix, 12).toLocalMatrix() times=true_divide( arange(0,T),T ) plt.plot(times, beta_est.toArray().flatten()) plt.plot(times,beta ) plt.show()