Kernel Regression using Pyspark

1. Kernel Regression using Pyspark

The red curve shows the true function m(x) while the green dots show the estimated curve evaluated using an random grid. The blue points are the simulated Y_i. A well known problem of the estimation method concerning boundary points is clearly visible.

In a previous article I presented an implementation of a kernel denisty estimation using pyspark. It is thus not difficult to modify the algorithm to estimate a kernel regression. Suppose that there exits some function m(x), an example for such functions are for instance temperature curves which measure the temperature during a day. In practice, such functions m(x) will often not be directly observed, but one will have to deal with discrete, noisy observations contaminated with some error. The purpose of kernel regression is then to estimate the underlying true function.

In the following I will only consider a simple, standard error model: For T design points
X_1,\dots,X_T\in [0,1] there are noisy observations Y_{l} such that

(1)   \begin{eqnarray*} Y_{l}=m(X_l)+\epsilon_{l},\quad l=1,\dots,T, \end{eqnarray*}

for i.i.d. zero mean error terms \epsilon_{l} with finite variance \sigma^2>0 and \mathbb{E}(\epsilon_{l}^4)<\infty.

To estimate m(x) I stick to the Nadaraya-Watson-Estimator given by

    \[\hat{m}(x) = \frac{\sum_{i=1}^T Y_i K_h(x-X_i) }{\sum_{i=1}^T K_h(x-X_i) }\]

where K_h(u)=h^{-1}K(u/h) is a kernel as described here, in the implementation again an Epanechnikov kernel K(u)=max(0,\frac{3}{4}(1-u^2)) is used. The reader might recognize that \sum_{i=1}^T K_h(x-X_i) is just the density (scaled by T) already implemented here, while \sum_{i=1}^T Y_i K_h(x-X_i) is just a weighted version of it. Implementation should therefore be very similar leading to the following algortihm:

###A Spark-Function to derive a non-parametric kernel regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import StandardScaler
import matplotlib.pyplot as plt
from numpy import *

##1.0 Simulated Data
T=1500

time=sort( random.uniform(0,1,T) )   ##Sorting is not required, i did it to have less trouble with the plots
true = sin( true_divide( time, 0.05*pi)  )
X= true + random.normal(0,0.1,T)

data= sc.parallelize(  zip(time,X)  )

plt.plot(time,true, 'bo')
plt.plot(time,X, 'bo')
plt.show()

##2.0 The Function
#2.1 Kernel Function

def spark_regression(data, Nout, bw):
    def epan_kernel(x,y,b):
        u=true_divide( (x-y), b)
        return max(0, true_divide( 1, b)*true_divide(3,4)*(1-u**2))     

    #derive the minia and maxi used for interpolation
    mini=data.map(lambda x: x[0]).takeOrdered(1, lambda x: x )
    maxi=data.map(lambda x: x[0]).takeOrdered(1, lambda x: -1*x )
    #create an interpolation grid (in fact this time it's random)
    random_grid = sc.parallelize(  random.uniform(maxi,mini,Nout)  )
    #compute K(x-xi) Matrix
    density=data.cartesian(random_grid).map(lambda x:( float(x[1]),epan_kernel(array(x[0][0]),array(x[1]),bw) ) )
    kernl=data.cartesian(random_grid).map(lambda x:( float(x[1]),x[0][1]*epan_kernel(array(x[0][0]),array(x[1]),bw) ) )

    mx= kernl.filter(lambda x: x&amp;gt;0).reduceByKey( lambda y, x:  y+x ).zip( density.filter(lambda x: x&amp;gt;0).reduceByKey( lambda y, x:  y+x ) )  ##added optional filter() does anyone know if this improves performance?
    return mx.map(lambda x: (x[0][0],true_divide(x[0][1],x[1][1]))  )

##3.0 Results

fitted= spark_regression(data, 128, 0.05).collect()
fit=array(fitted).transpose()

plt.plot(time,true, color='r')
plt.plot(fit[0], fit[1], 'bo')

plt.show()

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.