Hi guys,

This is a new post to address data munging and particularly how to deal with data quality issues. Yes you know it, it represents 70%-80% of the work for a data scientist and this is probably the most dirty part of it. People told you that data scientist was the sexiest job on earth. But as an introduction to your new job, you have to deal with the garbage. Where is sex in that? I understand you are a little bit disappointed, disillusioned… Well! The sooner, the better.  Be aware that later, when you’ll become a reputed data scientist, people will know it and they’ll come with their problems in order you find solutions for them. Is that is that a data-scientist? I think it is, but this is my own perception.

Ok so let’s start the cleaning.  Ho dear!  You’ve got  a very big set of data! Is that’s mean a lot of  garbage? Obviously! The good news is that you are a data scientist and you don’t need to make the same operation on million or billion of data and make always the same thing with various sets of data. Actually it could be interesting to put some automation in that low level task. It is what computers are made for, after all. We’ll see how to do that with Spark. Why spark? Because it is scalable, resilient, fast evolving and everything… Yes there is part of a love story between Spark and I.

Data munging have two main objectives:

  • Dealing with data quality
  • features selection and transformation (feature engineering).

Pre-requirments:

  1. For the purpose of the demonstration I use the dataset Zillow from Kaggle. This is quite a dirty one!
  2. Of course you’ll need to install spark 2.2.0
  3. Because you’ll want to reuse the tools you have built, you won’t use REPL and probably no notebook (Jupyter, Zeppelin) either. I use the IDE Intellij IDEA, but eclipse is good as well.
  4. Know basics about Spark and have some skills in scala.

I won’t go trough the installation procedure, this is definitively not data-science and there are helluva of very good tutorials available.

1. Connect your IDE to Spark

You could :

  • add the jar contained in the jars repository of your Spark installation into the libraries of your scala project
  • create a Maven project
  • create a sbt project.

I use the last option, because I found it is the simplest. This is what the build.sbt file should look like:

 name:= "zillow"
version:="1.0"
scalaVersion:="2.11.8"
scalacOptions ++= Seq("-deprecation")
resolvers += Resolver.sonatypeRepo("releases")
val sparkVersion = "2.2.0"
libraryDependencies += "junit" % "junit" % "4.10" % "test"
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion)

It is important to notice that scalaVersion should match with the version Spark has been compiled with. Here for Spark 2.2.0 you should have scala 2.11.8 installed on your computer. Then we’ll use the core of Spark for sure, but we also need sql for handling of data and mllib for the analysis of data.

Once Spark libraries are included in your project you have to create an instance of spark on the scala object you have created in your project:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object Zillow {
val spark = SparkSession.builder.appName("zillow").config("spark.master", "local[*]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
...
}

After libraries have been imported, we create a Spark session. The “local[*]” allows to recruit all cpu cores available on the local machine. Because I don’t want my  console to be polluted by  Spark verbosity I set the log level to “WARN”, but it’s up to you. Finally it is important to put the “import spark.implicits._ ” after the creation of the spark session. This allow implicit conversion of types of variables.

This is the 2.0 way to make things, you’ll find in older tutorials the previous way:

import org.apache.spark.SparkContext
import org.apache.spark.sql._
object Zillow{
val conf = new SparkConf().setAppName("zillow").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import spark.implicits._
...
}

 

2.Get data and deal with missing data.

 val train = spark.read.option("header", "true").option("sep", ",").csv(path + "/train_2016.csv")
val prop = spark.read.option("header", "true").option("sep", ",").csv(path + "/properties_2016.csv")
var mat = prop.join(train,train("parcelid") === prop("parcelid2")).drop("parcelid2").persist()
mat.createOrReplaceTempView("houses")
val colNames = mat.columns

The two csv files are loaded and joined on “parcelid”. You should notice that “mat” have been persisted, because we’re going to use these data several times and we want to keep them into memory to avoid disk I/O. That is one way to make Spark being fast. Furthermore “createOrReplaceTempView(“houses”)” creates a sql-like table that we could handle with sql request. In my hands, it is far more friendly than dealing with RDDs, but you could love to play with RDDs, again it’s up to you.

 val maxNan = threshold * mat.count()
colNames.filter(_ != "parcelid").map(x => (x, mat.filter(mat(x).isNull).count())).filter(x => x._2 > maxNan).map(x => x._1).toSet

First we define the number of missing values upon which the feature have to be discarded, then we applied the filter in a single kind of pipepline. Scala is a functional programming language where the result of a function is the entry of the next one, and so on. This the second reason why Spark, written in scala, is so fast. “toSet” is just a tip to get unique values  from the array of results

 

3.Basic dimensional reduction and correlated features.

 import org.apache.spark.ml.stat.ChiSquareTest
val discrete = Set("airconditioningtypeid","architecturalstyletypeid","buildingqualitytypeid","buildingclasstypeid","decktypeid","fips","fireplaceflag","hashottuborspa","heatingorsystemtypeid","pooltypeid10","pooltypeid2","pooltypeid7","propertycountylandusecode","propertylandusetypeid","propertyzoningdesc","rawcensustractandblock","censustractandblock","regionidcounty","regionidcity","regionidzip","regionidneighborhood","storytypeid","typeconstructiontypeid","taxdelinquencyflag")
val query = continue.map(x => s"CAST($x AS DOUBLE)").mkString(",")
val sqlDF = spark.sql("SELECT "+ query+",CAST(logerror AS DOUBLE) FROM houses")
val index= 0 to continue.length-1
val mcorr=for(i<-index.slice(0,index.length-1); j<-index.slice(i+1,index.length)) yield((continue(i),continue(j),sqlDF.stat.corr(continue(i), continue(j))))
val tcorr=continue.map(feat=>(feat,sqlDF.stat.corr(feat, "logerror")))
mcorr.filter(x=>x._3>threshold).map(x => if(math.abs(tcorr.filter(y=> y._1==x._1)(0)._2) < math.abs(tcorr.filter(y=> y._1==x._2)(0)._2)) x._1 else x._2).toSet

First we have to define manually which features are discrete factors, assuming that the others are continuous values. Then we have to type everything to double in order to calculate correlation between each pair of feature on one hand, and each feature with the target on the other hand. When two features are correlated to a level greater than the threshold, the most correlated to the target is kept. Shall we throw the discarded features to the bin? No we won’t, because correlated features are useful to fill the gaps of missing values. That is part of munging but it’s beyond the scope of this post.

Ok now let’s do the same with discrete features.

 val query = discrete.map(x => s"CAST($x AS STRING)").mkString(",")
var tmp = spark.sql("SELECT CAST(logerror AS DOUBLE)," + query + " FROM houses").persist() //.sample(false, 0.01)
val index= 0 to discrete.length-1
val mcorr = for(i<-index.slice(0,index.length-1); j<-index.slice(i+1, index.length)) yield(discrete(i),discrete(j),getChi2(tmp, discrete(i), discrete(j)))
mcorr.filter(x=>x._3>threshold).map(x => if(tcorr.filter(y=> y._1==x._1)(0)._2 < tcorr.filter(y=> y._1==x._2)(0)._2) x._1 else x._2).toSet

Is that all? Yes, but getChi2 is a personal function to transform factors into numerics after cleaning of missing values, in order to be processed with the Spark khi2’s test of independency. The khi2 value is then modified to give the result of a Kramer’test more suitable for this problem.

 import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.stat.ChiSquareTest
def getChi2(df: DataFrame, col1: String, col2: String) : Double={
val tmp1 = df.select(col1, col2).na.drop().cache()
val sizeCol1 = tmp1.select(col1).distinct().count()
val sizeCol2 = tmp1.select(col2).distinct().count()
if ((max(sizeCol1, sizeCol2)>10000) || (min(sizeCol1, sizeCol2)<2)) -1.0
else {
val indexer = Array(new StringIndexer().setInputCol(col1).setOutputCol(col1 + "_n"), new StringIndexer().setInputCol(col2).setOutputCol(col2 + "_n"))
val tmp2 = new Pipeline().setStages(indexer).fit(tmp1).transform(tmp1)
val tmp3 = tmp2.map(x => (x.getDouble(2), Vectors.dense(x.getDouble(3)))).toDF("label", "feature").cache()
val res = ChiSquareTest.test(tmp3, "feature", "label")
//for Kramer's phi
val s = res.select("statistics").rdd.map { case Row(v: Vector) => v}.first()(0)
val dofKV= min(sizeCol1, sizeCol2)-1
val denKV = tmp1.count() * dofKV
s/denKV
}

Do you think it is a tricky solution? I do think so. But remember Spark is for big data and distributed data-processing . You could make the things easier with R, But R have some limits when dealing with high throughput flows and very big sets of data. Anyway you can also use R from Spark? ( or Spark from R). This is the solution with a R script called from Spark. But remember this script won’t be parallelized.

 val query = discrete.map(x => s"CAST($x AS STRING)").mkString(",")
var tmp = spark.sql("SELECT CAST(logerror AS DOUBLE)," + query + " FROM houses").persist()
tmp = tmp.na.fill("Nan")
val stdin = tmp.rdd.map(r => r.toSeq.mkString(",")) .mapPartitions(x=>Array(x.reduce((a,b)=>(a+","+b))).toIterator) .map(x=>s"${discrete.length+1},"+x)
//if running on a cluster the script needs to be distributed fo each worker node
val eta = stdin.pipe(path+"/etapipe.R").first().split(",").map(x=>x.toDouble)
val tcorr = tmp.columns.tail.zip(eta)
mcorr.filter(x=>x._3>threshold).map(x => if(tcorr.filter(y=> y._1==x._1)(0)._2 < tcorr.filter(y=> y._1==x._2)(0)._2) x._1 else x._2).toSet

This is the R script:

 library(readr)
Rmatrix <- read_csv("~/Documents/kaggle/Zillow/Rmatrix.csv/Rmatrix.csv", col_types = cols(propertylandusetypeid = col_character(), regionidcity = col_character(), regionidcounty = col_character(), regionidzip = col_character()))
cv.test = function(x,y) {
CV = sqrt(chisq.test(x, y, correct=FALSE)$statistic/(length(x) * (min(length(unique(x)),length(unique(y))) - 1)))
print.noquote("Cramér V / Phi:")
return(as.numeric(CV))
}
library(lsr)
for (i in tail(colnames(Rmatrix)))
print(etaSquared(aov(lm(data=Rmatrix,as.formula(paste("logerror~", i))))))
#get eta2
r<-lapply(tail(colnames(Rmatrix)), function(i) etaSquared(aov(lm(data=Rmatrix,as.formula(paste("logerror~", i))))))
for (i in 2:6){
for (j in (i+1):7){
tmp <- subset(Rmatrix,TRUE, select = c(eval(colnames(Rmatrix)[i]), eval(colnames(Rmatrix)[j])))
tmp1 <- tmp[rowSums(is.na(tmp)==0)]
print(cv.test(tmp1[colnames(Rmatrix)[i]], tmp1[colnames(Rmatrix)[j]]))
}
}
for (i in tail(colnames(Rmatrix)))
tmp <- subset(Rmatrix,TRUE, select = c(eval(colnames(Rmatrix)[2])), eval(colnames(Rmatrix)[3]))
tmp <- tmp[rowSums(is.na(tmp)==0)]
cv.test(tmp[colnames(Rmatrix)[2]], tmp[colnames(Rmatrix)[3]])

Not so simple after all! The idea is that if you need some exotic statistics that is not available in Spark you can call a R script. Keep in mind that Spark is evolving fast. As an exemple I use the ml (machine learning) library that is the new version of the older mllib. Spark is young (2014) and already some libraries are mutating. If you don’t feel easy in changing environment forget about Spark, but you’re a data scientist and you’re not afraid about change. Right?

Well I hope I could help a little. In the next post we’ll  see how to make fine feature selection with random forest and Spark.

Take care.

Leave a Reply

Your email address will not be published. Required fields are marked *