Note: This notebook is hosted on Amazon EC2 with Python 2.7 and Spark 2.2 and Jupyter Notebook. The easiest way to replicate it is using a docker image like jupyter/all-spark-notebook. To use it after intalling docker simply run the following command:
docker run -it --rm -p 8888:8888 --name spark jupyter/all-spark-notebook
Question: Attached file auto_mpg_original.csv contains a set of data on automobile characteristics and fuel consumption. File auto_mpg_description.csv contains the description of the data. Import data into Spark. Randomly select 10-20% of you data for testing and use remaining data for training. Find all null values in all numerical columns. Replace nulls, if any, with average values for respective columns using Spark Data Frame API.
The first step of the analysis is to setup the spark context and load all the necessary libraries
# Load spark
import findspark
findspark.init("/home/tim/spark")
# Import libraries
import os
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
import pyspark.mllib
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import display, HTML
# Start spark context
conf = SparkConf().setAppName("hw7").setMaster("local[*]")
sc = SparkContext(conf=conf)
from pyspark.ml.feature import Imputer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
sqlContext = SQLContext(sc)
Next we can load the data and split the csv file by comma delimited mode.
# Read data
auto = sc.textFile("/home/tim/e63-coursework/hw7/data/auto_mpg_original-1.csv",
use_unicode=False)
# Split data
autordd = auto.map(lambda line: line.split(","))
With that we can add the column names and split the dataset into numerical and categorical variables.
# Create dataframe
df = autordd.map(lambda line: Row(mpg = line[0], cylinders = line[1],
displacement=line[2], horsepower=line[3],
weight=line[4], acceleration=line[5],
my=line[6], origin=line[7],
name=line[8])).toDF()
# Define numeric variables
df_num = df.select(df.mpg.cast('float'), df.displacement.cast('float'),
df.horsepower.cast('float'), df.weight.cast('float'),
df.acceleration.cast('float'), df.cylinders.cast('float'),
df.my.cast('float'), df.origin.cast('float'))
# Define categorical variables
df_cat = df.select(df.name.cast('string'))
Next we can have a look at the summary statistic of the numerical variabels.
# Calculate summary
df_num_summary = df_num.describe()
# Show numeric variabels I
df_num_summary['summary', 'mpg', 'displacement', 'horsepower', 'weight'].show()
# Show numeric variable II
df_num_summary['summary', 'acceleration', 'cylinders', 'my', 'origin'].show()
The above count statistics show, that there are NA's present. This appears to be the case for mpg and horsepower.
With Spark 2.2. there is a new the impute function available. With the help of this function the null values can be replaced with the group mean.
# Impute NA's with the average (new function on Spark 2.2)
imputer = Imputer(inputCols=df_num.columns, outputCols=["{}".format(c)
for c in df_num.columns])
df_num = imputer.setStrategy("mean").fit(df_num).transform(df_num)
Having imputed the numerical variables, the next step is to join the dataframe back into one.
# Add ID variable
df_num = df_num.withColumn("row_id", monotonically_increasing_id())
df_cat = df_cat.withColumn("row_id", monotonically_increasing_id())
# Join dataframe
df2 = df_num.join(df_cat, "row_id").drop("row_id")
# Show data
df2.show(5)
Note: In order to use the categorical variable name we use 1-of-k binary encoding. This is not necessary for the analysis in problem 2.
For the following analysis in problem 2, only variables mpg and horsepower are used. That is we only select those values.
# Select variables
df3 = df2.select(df2.mpg.cast('float'), df2.horsepower.cast('float'))
# Show data
df3.show(5)
In order to programmatically tell the ML Algorithm the features and target variables we're adding labeled points to the data.
# Create labeled points
df_labeled = df3.rdd.map(lambda r : LabeledPoint(r[0], r[1:]))
df_labeled.take(5)
In order to control for overfitting we're creating datasets for training (80%) and testing (20%) the models.
# Test and learning set.
df_train, df_test = df_labeled.randomSplit([.8, .2], seed=123)
# Cache data
df_train = df_train.cache()
df_test = df_test.cache()
# Get values
print "Training data size: %d" % df_train.count()
print "Test data size: %d" % df_test.count()
print "Total data size: %d " % df3.count()
print "Train + Test size : %d" % (df_train.count() + df_test.count())
There are 315 observation in the training set and 91 in the testset. This is equal to the number of observations in the overall dataset.
Question: Look initially at two variables in the data set from the previous problem: the horsepower and the mpg (miles per gallon). Treat mpg as a feature and horsepower as the target variable (label). Use MLlib linear regression to identify the model for the relationship. Use the test data to illustrate accuracy of the linear regression model and its ability to predict the relationship. Calculate two standard measures of model accuracy. Create a diagram using any technique of convenience to presents the model (straight line), and the original test data. Please label your axes and use different colors for original data and predicted data.
We're first defining the accuracy function. We're measaring the mean squared error, mean absolut error and the root mean squared log error.
# MSE
def squared_error(actual, pred):
return (pred - actual)**2
# MAE
def abs_error(actual, pred):
return np.abs(pred - actual)
# RMSLE
def squared_log_error(pred, actual):
return (np.log(pred + 1) - np.log(actual + 1))**2
Next we're calculating a linear regression model.
# Calculate model
fit_lin = LinearRegressionWithSGD.train(df_train, 1000, .0001, intercept=False)
print("Linear Model Info:" + str (fit_lin))
Having the above model we're next calculating the estimated values on the test set. Based on that we can calculate the accuracy rate.
# Calculate true vs. prediction
tbl_pred = df_test.map(lambda x: (x.label, fit_lin.predict(x.features)))
# Calculate two accuracy measures
mse = tbl_pred.map(lambda (t, p): squared_error(t, p)).mean()
mae = tbl_pred.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(tbl_pred.map(lambda(t,p):squared_log_error(t,p)).mean())
# Print output
print ("Linear model: Mean squared error: %2.4f" % mse)
print ("Linear model: Mean absolute error: %2.4f" % mae)
print ("Linear model: Root mean squared log error: %2.4f" % rmsle)
It appears that we're producing a bit of error. However, we have to be careful with the interpretation as we didn't standartize the model. Furthermore, in order to make the accuracy measures comparable we would have to calculate a baseline model. Without all this there is however another method assessing the goodness of fit and that is to visualize the model in a scatterplot.
Having calculated the accuracy rate a good indicator for the goodness of fit is to visualize the data.
pd_train_pred = df_train.map(lambda p: (float(p.label), float(fit_lin.predict(p.features)),
float(p.features[0]))).toDF().toPandas()
pd_test_pred = df_test.map(lambda p: (float(p.label), float(fit_lin.predict(p.features)),
float(p.features[0]))).toDF().toPandas()
pd_train_pred.columns = ['mpg', 'mpg_pred', 'horsepower']
pd_test_pred.columns = ['mpg', 'mpg_pred', 'horsepower']
In order to see the goodness of fit we're visualizing the model on top of a scatterplot.
plt.figure(1, figsize=(12,8))
plt.scatter(pd_test_pred['horsepower'], pd_test_pred['mpg'],
c='darkblue', alpha=0.5, edgecolor="")
plt.plot(pd_test_pred['horsepower'], pd_test_pred['mpg_pred'],
c='darkred', alpha=0.8)
plt.xlabel('horsepower')
plt.ylabel('mpg')
plt.title('Miles per gallon - Actual vs. predictions (Test data)')
plt.show()
As can be seen above, the model is quit a bad fit for the data. This is because the data appears no be of a polynomic nature. If we'd like to stick to the regression method, a good next step would be to fit fit a model with a quadratic term and an intercept. However, an alternative would be to fit a non-linear model such as a regression tree. This is what we're going to do in problem 4.
Because we only took 20% of the data as test. I also visualized the overall data to the model. However, the conclusion is the same as above.
plt.figure(1, figsize=(12,8))
plt.scatter(pd_test_pred['horsepower'], pd_test_pred['mpg'],
c='darkblue', alpha=0.5, edgecolor="")
plt.scatter(pd_train_pred['horsepower'], pd_train_pred['mpg'],
c='darkgreen', alpha=0.5, edgecolor="")
plt.plot(pd_test_pred['horsepower'], pd_test_pred['mpg_pred'],
c='darkred', alpha=0.8)
plt.xlabel('horsepower')
plt.ylabel('mpg')
plt.title('Miles per gallon - Actual vs. predictions (all available data)')
plt.show()
Question: Consider attached file Bike-Sharing-Dataset.zip. This is the bike set discussed in class. Do not use all columns of the data set. Retain the following variables: season, yr, mnth, hr, holiday, weekday, workingday, weathersit, temp, atemp, hum, windspeed, cnt. Discard others. Regard cnt as the target variable and all other variables as features. Please note that some of those are categorical variables. Identify categorical variables and use 1-of-k binary encoding for those variables. If there are any null values in numerical columns, replace those with average values for those columns using Spark DataFrame API. Train your model using LinearRegressionSGD method. Use test data (15% of all) to assess the quality of prediction for cnt variable. Calculate at least two performance metrics of your model.
The following functions are used in this problem set.
def get_mapping(rdd, idx):
return rdd.map(lambda fields: fields[idx]).distinct()
.zipWithIndex().collectAsMap()
def extract_features(record):
cat_vec = np.zeros(cat_len)
i = 0
step = 0
for field in record[0:8] :
m = mappings[i]
idx = m[field]
cat_vec[idx + step] = 1
i = i + 1
step = step + len(m)
num_vec = np.array([float(field) for field in record[8:12]])
return np.concatenate((cat_vec, num_vec))
def extract_label(record):
return float(record[-1])
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("p3_df_count")
.getOrCreate()
We're first loading the from the csv file as well as droping some unwanted variables.
# Load data
df_bike = spark.read.csv("file:///home/tim/e63-coursework/hw7/data/hour.csv",
nullValue='NA', header=True)
# Select data and arrange (categories first)
df_bike = df_bike.select(df_bike.season.cast('float'), df_bike.yr.cast('float'),
df_bike.mnth.cast('float'), df_bike.hr.cast('float'),
df_bike.holiday.cast('float'), df_bike.weekday.cast('float'),
df_bike.workingday.cast('float'), df_bike.weathersit.cast('float'),
df_bike.temp.cast('float'), df_bike.atemp.cast('float'),
df_bike.hum.cast('float'), df_bike.windspeed.cast('float'),
df_bike.cnt.cast('float'))
Next we're imputing missing values with the average value
# Impute NA's with the average (new function on Spark 2.2)
imputer = Imputer(inputCols=df_bike.columns, outputCols=["{}".format(c)
for c in df_bike.columns])
x = imputer.setStrategy("mean").fit(df_bike).transform(df_bike)
Next we can have a first look at the data
df_bike.show(5)
Next we're setting the type of the variables
# Select data and arrange (categories first)
df_bike = df_bike.select(df_bike.season.cast('int'), df_bike.yr.cast('int'),
df_bike.mnth.cast('int'), df_bike.hr.cast('int'),
df_bike.holiday.cast('int'), df_bike.weekday.cast('int'),
df_bike.workingday.cast('int'), df_bike.weathersit.cast('int'),
df_bike.temp.cast('float'), df_bike.atemp.cast('float'),
df_bike.hum.cast('float'), df_bike.windspeed.cast('float'),
df_bike.cnt.cast('float'))
df_bike.show(5)
We are left with 12 variables (features). The first eight are categorical, while the last 4 are numeric variables. The target variable is cnt
Next we're getting the mapping of the values
rdd_bike = df_bike.rdd
mappings = [get_mapping(rdd_bike, i) for i in range(0,8)]
print (mappings)
cat_len = np.sum(map(len, mappings))
print(rdd_bike.first()[8:12])
num_len = len(rdd_bike.first()[8:12])
total_len = num_len + cat_len
print "Feature vector length for categorical features: %d" % cat_len
print "Feature vector length for numerical features: %d" % num_len
print "Total feature vector length: %d" % total_len
There are 57 categorical features (dummys) and 4 numerical features in the data.
Next we're defining the labeled points
rdd_bike.take(5)
rdd_bike2 = rdd_bike.map(lambda r: LabeledPoint(extract_label(r),
extract_features(r)))
first_point = rdd_bike2.first()
print ("Label: " + str(first_point.label))
print ("Linear Model feature vector:\n" + str(first_point.features))
print ("Linear Model feature vector length: " + str(len(first_point.features)))
We're spliting the data into 85% training and 15% testing data
df_train, df_test = rdd_bike2.randomSplit([.85, .15], seed=123)
df_train = df_train.cache()
df_test = df_test.cache()
Next we're calculating the regression model with all the features against the training data.
fit_lin = LinearRegressionWithSGD.train(df_train, 500, .0001)
print(fit_lin)
With the above model we can predict the values from the test dataset.
df_pred = df_test.map(lambda p: (p.label, fit_lin.predict(p.features)))
print ("Linear Model predictions: " + str(df_pred.take(10)))
Next we can calculate the accuracy of our model.
mse2 = df_pred.map(lambda (t, p): squared_error(t, p)).mean()
mae2 = df_pred.map(lambda (t, p): abs_error(t, p)).mean()
rmsle2=np.sqrt(df_pred.map(lambda(t,p):squared_log_error(t,p)).mean())
print ("Linear model: - Mean squared error: %2.4f" % mse)
print ("Linear model: - Mean absolute error: %2.4f" % mae)
print ("Linear model: - Root mean squared log error: %2.4f" % rmsle)
The model has a MSE of 242.64 and a MAE of 12.39 while the RMSLE is at 0.69.
Question: Use a Decision Tree model to predict mpg values in auto_mpg_original.txt data. Assess accuracy of your prediction using at least two performance metrics.
The below problem needs the following functions to be solved.
def extract_features(record):
cat_vec = np.zeros(cat_len)
i = 0
step = 0
for field in record[0:4] :
m = mappings[i]
idx = m[field]
cat_vec[idx + step] = 1
i = i + 1
step = step + len(m)
num_vec = np.array([float(field) for field in record[4:8]])
return np.concatenate((cat_vec, num_vec))
def extract_features_dt(record):
return np.array(map(float, record[0:8]))
First we're preparing the dataframe. We're using the same dataframe as used in problem 1 & 2. That means we can pick up from dataframe2 (df2). The advantage of the regression tree is that we can simply hash the car name for our analyis.
# Prepare data
df_car = df2.select(df2.cylinders.cast('int'), df2.my.cast('int'),
abs(hash(df2.name)).alias('name').cast('int'),
df2.origin.cast('int'), df2.acceleration, df2.displacement,
df2.horsepower, df2.weight, df2.mpg)
rdd_car = df_car.rdd
Next we can create the mapping.
# Create Mapping
mappings = [get_mapping(rdd_car, i) for i in range(0,4)]
print (mappings)
Next we can output the features used for the prediction.
print "Length of categorical features: %d" % np.sum(map(len, mappings))
print "Length of numerical features: %d" % len(rdd_car.first()[4:9])
print "Total feature vector length: %d" % (np.sum(map(len, mappings)) + len(rdd_car.first()[4:9]))
Next we can label the dataset.
df_car2 = rdd_car.map(lambda r: LabeledPoint(extract_label(r),
extract_features_dt(r)))
df_car2.take(5)
first_point = df_car2.first()
print ("Label: " + str(first_point.label))
print ("Linear Model feature vector:\n" + str(first_point.features))
print ("Linear Model feature vector length: " + str(len(first_point.features)))
In order to control for overfitting we're splitting the dataset. We're using the same split as in problem 1, i.e., 80% training and 20% test set.
# Split data
df_train, df_test = df_car2.randomSplit([.8, .2], seed=123)
df_train = df_train.cache()
df_test = df_test.cache()
Next we can build the regression tree with the DecisionTree method. It thas a default tree depth of 5. As we're using the labelpoint we don't have to input any categorical features.
fit_tree = DecisionTree.trainRegressor(df_train,{})
With the above model we can predict the values in the test set.
preds = fit_tree.predict(df_test.map(lambda p: p.features))
actual = df_test.map(lambda p: p.label)
tbl_pred = actual.zip(preds)
Next we can output the tree characteristics.
print ("Decision tree predictions: " + str(tbl_pred.take(5)))
print ("Decision tree depth: " + str(fit_tree.depth()))
print ("Decision tree number of nodes: " + str(fit_tree.numNodes()))
testErr = tbl_pred.filter(lambda (v, p): v != p).count() / float(df_test.count())
print('Test error = ' + str(testErr))
The tree has 63 nodes and tree depth of 5.
Next we can have a look at the accuracy of the model.
mse = tbl_pred.map(lambda (t, p): squared_error(t, p)).mean()
mae = tbl_pred.map(lambda (t, p): abs_error(t, p)).mean()
rmsle = np.sqrt(tbl_pred.map(lambda (t, p): squared_log_error(t, p)).mean())
print "Decision tree: Mean squared error: %2.4f" % mse
print "Decision tree: Mean absolute error: %2.4f" % mae
print "Decision tree: Root mean squared log error: %2.4f" % rmsle
The above tree achieves a MSE of 24, a MAE of 3.3 and a RMSLE of 0.2. comparing that with the simple linear regression model from problem 2 (MSE: 242.64, MAE: 12.39 and RMSLE: 0.69) the regression tree clearly outperforms the regression model. This was to be expected as we're using much more features. Furthremore, as already mentioned in problem 2, some of the data shows a non-linear relationship with the target variable.
Last but not least we can have a look at the actual tree.
print(fit_tree.toDebugString())
As can be seen above, the tree uses a multitude of features that the tree selected as spliting points.