Apache Spark Data Frames : Part 1

In this post we will discuss how can we use Data Frame API in Apache Spark to process data. Lot of users of Apache Spark who come from python or R language data science , always use to complaint about Spark RDDs difficult to process. As a Spark Developer I also used to find it difficult to use rdd. But after adding Data Frames support spark has become more easy to use tool. developers can easily query and manipulate data. Let us first understand the basics of Spark Data Frames

     Spark Data Frame is a distributed collection of data organized into columns. It provide functions to easily select specific columns, filter data, group by and aggregate data. People who are from SQL background can simply write a sql query and pass to SQLContext, which will do everything for us. people who are from R or Python background they will be aware of data frames. people who are from other background , can imagine data frame as a table.

Creating a data frame from a file in HDFS :  In this section we will see how can we create Data Frame from a text file in HDFS.

    1. Creating a spark context and sqlcontext.
       
       // Create a spark conf set app name and master
       val conf = new SparkConf().setAppName("Data Frame Test").setMaster("local")

 //Use spark conf and create SparkContext
val sc : SparkContext = new SparkContext(conf)

//Create SQLContext
val sqlc : SQLContext = new SQLContext(conf)

 //This import statement helps in any kind of implicit conversion required for Data //frames. One example is converting RDD to data frame using rdd.toDF() method
// other is converting column name to column types while passing column name to any data //frame function.
import sqlc.implicits._

    2. Next step is to load a file from HDFS.

val dataRDD = sc.textFile("hdfs://user/harjeet/data.csv")

    3. Get schema of file from header.

     //Get Header from RDD  
     val header = dataRDD.first()
           
    //Get all lines from RDD other than header
val data = dataRDD.filter(x => x!=header)

val columns = StructType([ StructField(col , StringType, true) for col in header.split("//,")])

4. Converting RDD to Data Frame.
   
    //   For This step, we should be aware about number of columns in data set and following line //should change accordingly
val rowRDD = data.map(x => x.split("//,").map( p => Row ( p(0) , p(1) , p(2) ) )

  // Creating data Frame

  val df = sqlc.createDataFrame(rowRDD,columns)

These steps will create a data frame , which can be used to filter, query or aggregate the data. we can print the schema of dataframe also using following code.

  df.printSchema()

This is how we can create data frames from rdd in apache spark. In part 2 we will see what kind of
oerpations we can do on data frames.

Comments

Post a Comment

Popular posts from this blog

Hive UDF Example

Custom UDF in Apache Spark

Enterprise Kafka and Spark : Kerberos based Integration