File Loading and Conversion

Loading and Saving Files in Spark

  • See https://github.com/databricks/spark-csv for loading csv file as Dataframe. This defines a new data format "com.databricks.spark.csv" so that it is usable from sqlContext. This format is comparable to parquet or any other custom format.

Usage:

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

val selectedData = df.select("year", "model")
selectedData.write
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
    .save("newcars.csv.gz")

To invoke the shell:
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
  • Reading/Writing Parquet format files involve creating dataframes and then write :

    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._
    
    // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
    // you can use custom classes that implement the Product interface
    case class Person(name: String, age: Long)
    
    val peopleDF = spark.read.json("examples/src/main/resources/people.json").as[Person]
    
    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")
    
    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")
    
    ===================================================================================
    
    // Encoders are created for case classes
    val caseClassDS = Seq(Person("Andy", 32)).toDS()
    caseClassDS.show()
    
    
    // Encoders for most common types are automatically provided by importing spark.implicits._
    val primitiveDS = Seq(1, 2, 3).toDS()
    primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)    
    
    Saving RDD as single file instead of bunch of partitions
    
    ===================================================================================
    

Conversion using Python

df = spark.read.parquet("infile.parquet")
df.write.csv("outfile.csv")

// Relevant API documentation:

// pyspark.sql.DataFrameReader.parquet
// pyspark.sql.DataFrameWriter.csv

For Fixed width:

str = '1234567890'
w = [0,2,5,7,10]
cols =  [ str[ w[i-1] : w[i] ] for i in range(1,len(w)) ]
//  ['12', '345', '67', '890']

Using pandas:

import pandas as pd

path = 'filename.txt'

#using pandas with a column specification   
col_specification =[(0, 20), (21, 30), (31, 50), (51, 100)]
data = pd.read_fwf(path, colspecs=col_specification)

Spark Dataframes

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._


val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()

// ============================================================================

val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()


// ==========================================================================

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name

results.map(attributes => "Name: " + attributes(0)).show()

// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|

Conversion Using Apache Drill

$ cd /opt/drill/bin
$ sqlline -u jdbc:drill:zk=local
Create the Parquet file:

-- Set default table format to parquet
ALTER SESSION SET `store.format`='parquet';

-- Create a parquet table containing all data from the CSV table
CREATE TABLE dfs.tmp.`/stats/airport_data/` AS
SELECT
CAST(SUBSTR(columns[0],1,4) AS INT)  `YEAR`,
CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`,
columns[1] as `AIRLINE`,
columns[2] as `IATA_CODE`,
columns[3] as `AIRLINE_2`,
columns[4] as `IATA_CODE_2`,
columns[5] as `GEO_SUMMARY`,
columns[6] as `GEO_REGION`,
columns[7] as `ACTIVITY_CODE`,
columns[8] as `PRICE_CODE`,
columns[9] as `TERMINAL`,
columns[10] as `BOARDING_AREA`,
CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT`
FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;
Try selecting data from the new Parquet file:

-- Select data from parquet table
SELECT *
FROM dfs.tmp.`/stats/airport_data/*`
You can change the dfs.tmp location by going to http://localhost:8047/storage/dfs (source: CSV and Parquet).

Hints

  • org.apache.spark.storage.StorageLevel is one of: DISK_ONLY, MEMORY_AND_DISK, etc.
  • rdd2.join(rdd1, key).take(10)
  • rdd2.map(splitLines).groupBy(key)
  • dataframe can come from SqlContext (using SQL) or SparkSession or SparkContext etc.
  • a === b : is more like a function rather than a condition
  • There is basic sparkSQL and HiveQL. The ./bin/spark-sql is CLI.
  • Scala Seq is like List in Java.
  • DataFrames are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site.
  • registerTempTable()
  • When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore. (data can exist anywhere out of metastore)

RDD vs DataFrame Vs Dataset

  • Spark SQL is a Spark module for structured data processing. You can leverage this using SQL or the Dataset API.

  • Spark SQL can also be used to read data from an existing Hive installation. The cli is ./bin/spark-sql ; Provides JDBC/ODBC interface. You can retrieve the data as Dataframe/Dataset in Scala/Java/Python/R.

  • There is rdd, dataset, dataframe APIs A DataFrame is a Dataset organized into named columns.

  • To deal with RDDs you need SparkContext, For using SQL, you need SQLContext, for streaming StreamingContext and so on.

  • SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext.

  • HiveContext provided better SQL parser and window functions than SqlContext. But now there is no difference.

  • RDD is still supported as lower level API, but most applications would use DataSet/ Dataframe going forward.

  • Dataset provides highest compile time type safety and best optimization possibilities for space and time)

  • rdd provides map, filter, saveAsObjectFile (Java serialization), etc ::
    rdd.filter(_.age > 21) .map(_.last) .saveAsObjectFile("under21.bin")

  • dataframe API lets you use expression as SQL string:

    df.filter("age > 21");
    Expression builder style:
    df.filter(df.col("age").gt(21));
    
  • Create dataset from a list of variables:

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val sampleData: Seq[ScalaPerson] = ScalaData.sampleData()
    val dataset = sqlContext.createDataset(sampleData)
    
  • In the Scala API, DataFrame is simply a type alias of Dataset[Row] While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

  • In spark-shell, you have predefined sc (spark context) and spark (spark session):

    import org.apache.spark.sql.SparkSession         // Latest preferred interface.
    
    val spark = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .enableHiveSupport()                                     // This is needed if you want underlying HiveContext
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("spark.some.config.option", "some-value")
      .getOrCreate()
    
    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._
    
  • For more of converting between dataset and dataframe, See: http://www.agildata.com/apache-spark-2-0-api-improvements-rdd-dataframe-dataset-sql/

Inspecting Dataframe

  • df.show()
  • df.printSchema()
  • df.groupBy("age").count().show()

Creation Of Dataframe

Description                       Code
-----------------------------------------------------------------------------------

From CSV file                     val df = sparkSession.read.option("header","true")
                                           .csv("src/main/resources/sales.csv")

                                  sqlContext.read.format("com.databricks.spark.csv").option("header", "true").
                                  option("inferSchema", "true").load("file.csv")    // Old Style

From CSV without header           val schema = StructType(List of StructField(fieldName, StringType, true)))
                                  val myDf = sqlContext.createDataFrame(myRDD, schema)

                                  ==============================================================================
                                  // read a text file into a DataFrame a.k.a. Dataset[Row]
                                  var df: Dataset[Row] = spark.read.text("people.txt")
                                  ==============================================================================

                                  // use map() to convert to a Dataset of a specific class
                                  var ds: Dataset[Person] = spark.read.text("people.txt")
                                                                 .map(row => parsePerson(row))
                                  def parsePerson(row: Row) : Person = ??? // fill in parsing logic here

                                  ==============================================================================

                                  case class Person(name: String, age: Long)

                                  // Create an RDD of Person objects from a text file, convert it to a Dataframe
                                  val peopleDF = spark.sparkContext
                                     .textFile("examples/src/main/resources/people.txt")
                                     .map(_.split(","))
                                     .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
                                     .toDF()
                                  // Register the DataFrame as a temporary view
                                  peopleDF.createOrReplaceTempView("people")
                                  ==============================================================================


From Parquet file                 df = sqlContext.parquetFile("...")

From Json file                    df = spark.read.json("examples/src/main/resources/people.json")
                                  Note: spark is predefined SparkSession object. (preferable)

                                  val df = sqlContext.jsonFile("people.json")

                                  val peopleDS = spark.read.json(path).as[Person]    // Strict Type Safety


From Table                        val dfFromTbl = sqlContext.table("flightsPermTbl")

Saving Dataframe as ORC/Parquet/Text file or table

Note:  The table and destination files are stored in ./spark-warehouse/<tableName> directory.

Destination       Code

Table             updatedDF.write.format("orc").mode(SaveMode.Overwrite).saveAsTable("flightsPermTbl")

Text File         myRdd.saveAsTextFile("/tmp/wordcount")   // This works only for RDD ???
                  df.rdd.saveAsTextFile("/tmp/mytxtfile")   // df.rdd converts df to RDD.
                  df.write.text("/path/to/output")          // This requires df to have single column only.

Parquet File      df.saveAsParquetFile("/tmp/myparquetFile")
                  peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

CSV               df.write.format("csv").option("header", "true").save("myCsvFile.csv")   // Saved in current dir as ./myCsvFile.csv/ !!!
                  // If you want more supported options, ....
                  // You need:  spark-shell --packages com.databricks:spark-csv_2.10:1.4.0 (for scala 2.10 version)
                  df.write.repartition(1).format("com.databricks.spark.csv").option("header", "true").save("file.csv")

To automate multiple csv output file to single output file, use this scala program :

df.write.
    repartition(1).
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

def saveDfToCsv(df: DataFrame, tsvOutput: String,
                sep: String = ",", header: Boolean = false): Unit = {
    val tmpParquetDir = "Posts.tmp.parquet"
    df.repartition(1).write.
        format("com.databricks.spark.csv").
        option("header", header.toString).
        option("delimiter", sep).
        save(tmpParquetDir)
    val dir = new File(tmpParquetDir)
    val tmpTsvFile = tmpParquetDir + File.separatorChar + "part-00000"
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput))
    dir.listFiles.foreach( f => f.delete )
    dir.delete
}

Dataframe column renaming and type fixing

df = (df.withColumn('pickup_datetime', df.pickup_datetime.cast('timestamp'))    // Converting column type from string to timestamp.
        .withColumnRenamed('_id', 'id')   // renaming of column.

Persisting Dataframe

  • You can persist the dataframe as temporary table within the session as follows:

    // Register the DataFrame as a SQL temporary view
    df.createOrReplaceTempView("people")
    
    // Generation of additional/derived dataframes is much easier now ...
    val sqlDF = spark.sql("SELECT * FROM people")
    sqlDF.show()