1. Kernel Regression using Pyspark

. A well known problem of the estimation method concerning boundary points is clearly visible.In a previous article I presented an implementation of a kernel denisty estimation using pyspark. It is thus not difficult to modify the algorithm to estimate a kernel regression. Suppose that there exits some function
, an example for such functions are for instance temperature curves which measure the temperature during a day. In practice, such functions
will often not be directly observed, but one will have to deal with discrete, noisy observations contaminated with some error. The purpose of kernel regression is then to estimate the underlying true function.
In the following I will only consider a simple, standard error model: For
design points
there are noisy observations
such that
(1) ![]()
for i.i.d. zero mean error terms
with finite variance
and
.
To estimate
I stick to the Nadaraya-Watson-Estimator given by
![Rendered by QuickLaTeX.com \[\hat{m}(x) = \frac{\sum_{i=1}^T Y_i K_h(x-X_i) }{\sum_{i=1}^T K_h(x-X_i) }\]](https://www.thebigdatablog.com/wp-content/ql-cache/quicklatex.com-6803f42b1fad36ea641461f87475ad13_l3.png)
where
is a kernel as described here, in the implementation again an Epanechnikov kernel
is used. The reader might recognize that
is just the density (scaled by
) already implemented here, while
is just a weighted version of it. Implementation should therefore be very similar leading to the following algortihm:
###A Spark-Function to derive a non-parametric kernel regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import StandardScaler
import matplotlib.pyplot as plt
from numpy import *
##1.0 Simulated Data
T=1500
time=sort( random.uniform(0,1,T) ) ##Sorting is not required, i did it to have less trouble with the plots
true = sin( true_divide( time, 0.05*pi) )
X= true + random.normal(0,0.1,T)
data= sc.parallelize( zip(time,X) )
plt.plot(time,true, 'bo')
plt.plot(time,X, 'bo')
plt.show()
##2.0 The Function
#2.1 Kernel Function
def spark_regression(data, Nout, bw):
def epan_kernel(x,y,b):
u=true_divide( (x-y), b)
return max(0, true_divide( 1, b)*true_divide(3,4)*(1-u**2))
#derive the minia and maxi used for interpolation
mini=data.map(lambda x: x[0]).takeOrdered(1, lambda x: x )
maxi=data.map(lambda x: x[0]).takeOrdered(1, lambda x: -1*x )
#create an interpolation grid (in fact this time it's random)
random_grid = sc.parallelize( random.uniform(maxi,mini,Nout) )
#compute K(x-xi) Matrix
density=data.cartesian(random_grid).map(lambda x:( float(x[1]),epan_kernel(array(x[0][0]),array(x[1]),bw) ) )
kernl=data.cartesian(random_grid).map(lambda x:( float(x[1]),x[0][1]*epan_kernel(array(x[0][0]),array(x[1]),bw) ) )
mx= kernl.filter(lambda x: x>0).reduceByKey( lambda y, x: y+x ).zip( density.filter(lambda x: x>0).reduceByKey( lambda y, x: y+x ) ) ##added optional filter() does anyone know if this improves performance?
return mx.map(lambda x: (x[0][0],true_divide(x[0][1],x[1][1])) )
##3.0 Results
fitted= spark_regression(data, 128, 0.05).collect()
fit=array(fitted).transpose()
plt.plot(time,true, color='r')
plt.plot(fit[0], fit[1], 'bo')
plt.show()
Pingback: Kernel Regression using the Fast Fourier Transform – The Big Data Blog