File Loading and Conversion
Usage:
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
val selectedData = df.select("year", "model")
selectedData.write
.format("com.databricks.spark.csv")
.option("header", "true")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.save("newcars.csv.gz")
To invoke the shell:
$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
Reading/Writing Parquet format files involve creating dataframes and then write :
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)
val peopleDF = spark.read.json("examples/src/main/resources/people.json").as[Person]
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
===================================================================================
// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)
Saving RDD as single file instead of bunch of partitions
===================================================================================
df = spark.read.parquet("infile.parquet")
df.write.csv("outfile.csv")
// Relevant API documentation:
// pyspark.sql.DataFrameReader.parquet
// pyspark.sql.DataFrameWriter.csv
For Fixed width:
str = '1234567890'
w = [0,2,5,7,10]
cols = [ str[ w[i-1] : w[i] ] for i in range(1,len(w)) ]
// ['12', '345', '67', '890']
Using pandas:
import pandas as pd
path = 'filename.txt'
#using pandas with a column specification
col_specification =[(0, 20), (21, 30), (31, 50), (51, 100)]
data = pd.read_fwf(path, colspecs=col_specification)
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
val df = spark.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// ============================================================================
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// ==========================================================================
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
$ cd /opt/drill/bin
$ sqlline -u jdbc:drill:zk=local
Create the Parquet file:
-- Set default table format to parquet
ALTER SESSION SET `store.format`='parquet';
-- Create a parquet table containing all data from the CSV table
CREATE TABLE dfs.tmp.`/stats/airport_data/` AS
SELECT
CAST(SUBSTR(columns[0],1,4) AS INT) `YEAR`,
CAST(SUBSTR(columns[0],5,2) AS INT) `MONTH`,
columns[1] as `AIRLINE`,
columns[2] as `IATA_CODE`,
columns[3] as `AIRLINE_2`,
columns[4] as `IATA_CODE_2`,
columns[5] as `GEO_SUMMARY`,
columns[6] as `GEO_REGION`,
columns[7] as `ACTIVITY_CODE`,
columns[8] as `PRICE_CODE`,
columns[9] as `TERMINAL`,
columns[10] as `BOARDING_AREA`,
CAST(columns[11] AS DOUBLE) as `PASSENGER_COUNT`
FROM dfs.`/opendata/Passenger/SFO_Passenger_Data/*.csv`;
Try selecting data from the new Parquet file:
-- Select data from parquet table
SELECT *
FROM dfs.tmp.`/stats/airport_data/*`
You can change the dfs.tmp location by going to http://localhost:8047/storage/dfs (source: CSV and Parquet).
Spark SQL is a Spark module for structured data processing. You can leverage this using SQL or the Dataset API.
Spark SQL can also be used to read data from an existing Hive installation. The cli is ./bin/spark-sql ; Provides JDBC/ODBC interface. You can retrieve the data as Dataframe/Dataset in Scala/Java/Python/R.
There is rdd, dataset, dataframe APIs A DataFrame is a Dataset organized into named columns.
To deal with RDDs you need SparkContext, For using SQL, you need SQLContext, for streaming StreamingContext and so on.
SparkSession is essentially combination of SQLContext, HiveContext and future StreamingContext.
HiveContext provided better SQL parser and window functions than SqlContext. But now there is no difference.
RDD is still supported as lower level API, but most applications would use DataSet/ Dataframe going forward.
Dataset provides highest compile time type safety and best optimization possibilities for space and time)
rdd provides map, filter, saveAsObjectFile (Java serialization), etc ::
rdd.filter(_.age > 21) .map(_.last)
.saveAsObjectFile("under21.bin")
dataframe API lets you use expression as SQL string:
df.filter("age > 21");
Expression builder style:
df.filter(df.col("age").gt(21));
Create dataset from a list of variables:
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val sampleData: Seq[ScalaPerson] = ScalaData.sampleData()
val dataset = sqlContext.createDataset(sampleData)
In the Scala API, DataFrame is simply a type alias of Dataset[Row] While, in Java API, users need to use Dataset<Row> to represent a DataFrame.
In spark-shell, you have predefined sc (spark context) and spark (spark session):
import org.apache.spark.sql.SparkSession // Latest preferred interface.
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.enableHiveSupport() // This is needed if you want underlying HiveContext
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("spark.some.config.option", "some-value")
.getOrCreate()
// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._
For more of converting between dataset and dataframe, See: http://www.agildata.com/apache-spark-2-0-api-improvements-rdd-dataframe-dataset-sql/
Description Code
-----------------------------------------------------------------------------------
From CSV file val df = sparkSession.read.option("header","true")
.csv("src/main/resources/sales.csv")
sqlContext.read.format("com.databricks.spark.csv").option("header", "true").
option("inferSchema", "true").load("file.csv") // Old Style
From CSV without header val schema = StructType(List of StructField(fieldName, StringType, true)))
val myDf = sqlContext.createDataFrame(myRDD, schema)
==============================================================================
// read a text file into a DataFrame a.k.a. Dataset[Row]
var df: Dataset[Row] = spark.read.text("people.txt")
==============================================================================
// use map() to convert to a Dataset of a specific class
var ds: Dataset[Person] = spark.read.text("people.txt")
.map(row => parsePerson(row))
def parsePerson(row: Row) : Person = ??? // fill in parsing logic here
==============================================================================
case class Person(name: String, age: Long)
// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
==============================================================================
From Parquet file df = sqlContext.parquetFile("...")
From Json file df = spark.read.json("examples/src/main/resources/people.json")
Note: spark is predefined SparkSession object. (preferable)
val df = sqlContext.jsonFile("people.json")
val peopleDS = spark.read.json(path).as[Person] // Strict Type Safety
From Table val dfFromTbl = sqlContext.table("flightsPermTbl")
Note: The table and destination files are stored in ./spark-warehouse/<tableName> directory.
Destination Code
Table updatedDF.write.format("orc").mode(SaveMode.Overwrite).saveAsTable("flightsPermTbl")
Text File myRdd.saveAsTextFile("/tmp/wordcount") // This works only for RDD ???
df.rdd.saveAsTextFile("/tmp/mytxtfile") // df.rdd converts df to RDD.
df.write.text("/path/to/output") // This requires df to have single column only.
Parquet File df.saveAsParquetFile("/tmp/myparquetFile")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
CSV df.write.format("csv").option("header", "true").save("myCsvFile.csv") // Saved in current dir as ./myCsvFile.csv/ !!!
// If you want more supported options, ....
// You need: spark-shell --packages com.databricks:spark-csv_2.10:1.4.0 (for scala 2.10 version)
df.write.repartition(1).format("com.databricks.spark.csv").option("header", "true").save("file.csv")
To automate multiple csv output file to single output file, use this scala program :
df.write.
repartition(1).
format("com.databricks.spark.csv").
option("header", "true").
save("myfile.csv")
def saveDfToCsv(df: DataFrame, tsvOutput: String,
sep: String = ",", header: Boolean = false): Unit = {
val tmpParquetDir = "Posts.tmp.parquet"
df.repartition(1).write.
format("com.databricks.spark.csv").
option("header", header.toString).
option("delimiter", sep).
save(tmpParquetDir)
val dir = new File(tmpParquetDir)
val tmpTsvFile = tmpParquetDir + File.separatorChar + "part-00000"
(new File(tmpTsvFile)).renameTo(new File(tsvOutput))
dir.listFiles.foreach( f => f.delete )
dir.delete
}
df = (df.withColumn('pickup_datetime', df.pickup_datetime.cast('timestamp')) // Converting column type from string to timestamp.
.withColumnRenamed('_id', 'id') // renaming of column.
You can persist the dataframe as temporary table within the session as follows:
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
// Generation of additional/derived dataframes is much easier now ...
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()