# Generate our own CSV data # This way we don't have to access the file system yet. stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])
# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types # schemaString = "id name age eyeColor" schema = StructType([ StructField("id", LongType(), True), StructField("name", StringType(), True), StructField("age", LongType(), True), StructField("eyeColor", StringType(), True) ])
# Apply the schema to the RDD and Create DataFrame swimmers = spark.createDataFrame(stringCSVRDD, schema)
# Creates a temporary view using the DataFrame swimmers.createOrReplaceTempView("swimmers")
SparkSession
Notice that we’re no longer using sqlContext.read... but instead spark.read.... This is because as part of Spark 2.0, HiveContext, SQLContext, StreamingContext, SparkContext have been merged together into the Spark Session spark.
sc.stop() from pyspark import SparkContext from pyspark.sql import SparkSession sc = SparkContext('local', 'sparkApp') print(sc.version)
spark = SparkSession(sc)
Querying with SQL
With DataFrames, you can start writing your queries using Spark SQL - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).
1 2 3 4 5 6 7 8 9 10
# Query id and age for swimmers with age = 22 via DataFrame API,and Get count of rows in SQL spark.sql("select * from swimmers").show() spark.sql("select count(1) from swimmers").show()
spark.sql("select id, age from swimmers where age = 22").show() spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
# Query id and age for swimmers with age = 22 via DataFrame API swimmers.select("id", "age").filter("age = 22").show()
Query Sum of Flight Delays by City and Origin Code (for Washington State)
1
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()
Query Sum of Flight Delays by State (for the US)
1
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()
Note, you can make use of %sql within the notebook cells of a Databricks notebook
1 2 3 4 5 6 7 8
%sql -- Query Sum of Flight Delays by State (for the US) select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State