Spark笔记-玩转SparkSQL

DataFrames

Generate

创建一个RDD对象stringRDD,然后通过spark.read.json将stringRDD转换为DataFrame。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
stringJSONRDD = sc.parallelize((""" 
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

# DataFrame API
swimmersJSON.show()

"""
+---+--------+---+-------+
|age|eyeColor| id| name|
+---+--------+---+-------+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+---+--------+---+-------+
"""

# SQL Query
spark.sql("select * from swimmersJSON where age=19").collect()

"""
[Row(age=19, eyeColor='brown', id='123', name='Katie')]
"""

# Print the schema
swimmersJSON.printSchema()

"""
root
|-- age: long (nullable = true)
|-- eyeColor: string (nullable = true)
|-- id: string (nullable = true)
|-- name: string (nullable = true)
"""

Programmatically Specifying the Schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql.types import *

# Generate our own CSV data
# This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
# schemaString = "id name age eyeColor"
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

SparkSession

Notice that we’re no longer using sqlContext.read... but instead spark.read.... This is because as part of Spark 2.0, HiveContext, SQLContext, StreamingContext, SparkContext have been merged together into the Spark Session spark.

  • Entry point for reading data
  • Working with metadata
  • Configuration
  • Cluster resource management

For more information, please refer to How to use SparkSession in Apache Spark 2.0 (http://bit.ly/2br0Fr1).

1
2
3
4
5
6
7
8
9
10
import sys
sys.path.append("D:\\Programs\\Spark\\spark-2.4.5-bin-hadoop2.7\\python")

sc.stop()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext('local', 'sparkApp')
print(sc.version)

spark = SparkSession(sc)

Querying with SQL

With DataFrames, you can start writing your queries using Spark SQL - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

1
2
3
4
5
6
7
8
9
10
# Query id and age for swimmers with age = 22 via DataFrame API,and Get count of rows in SQL
spark.sql("select * from swimmers").show()
spark.sql("select count(1) from swimmers").show()

spark.sql("select id, age from swimmers where age = 22").show()
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()


# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

Querying with the DataFrame API

1
2
3
4
5
swimmers.show()
display(swimmers)
swimmers.count()

swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

DataFrame Queries

let’s first build the DataFrames from the source datasets.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Set File Paths
flightPerfFilePath = "./flight-data/departuredelays.csv"
airportsFilePath = "./flight-data/airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset
flightPerf.cache()

Output:

1
# DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

Query Sum of Flight Delays by City and Origin Code (for Washington State)

1
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()

Query Sum of Flight Delays by State (for the US)

1
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()

Note, you can make use of %sql within the notebook cells of a Databricks notebook

1
2
3
4
5
6
7
8
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.Country = 'USA'
group by a.State

For more information, please refer to:

------ 本文结束------
Donate comment here.

欢迎关注我的其它发布渠道