Python-PySpark学习

创建Session

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

# method 1
spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()

# method 2
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# method 1
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

# 字典创建
df = spark.createDataFrame([{'name':'Alice','age':1},
{'name':'Polo','age':1}])

# 指定schema创建
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
df = spark.createDataFrame(csvRDD, schema)

# method
x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# 从文件读
df = spark.read.json("data/people.json")
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')

# method 3
sc = spark.sparkContext
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
df = spark.createDataFrame(people)

# method 4: 从pandas创建
import pandas as pd
df = spark.createDataFrame(pd.DataFrame(pd.Series([1, 2, 3]), columns=["x"]))

架构,信息查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 打印架构
df.printSchema()

# 例如特定列中的行总数,其均值,标准差,列的最小值和最大值/空值计数
df.describe(["age"]).show()
df.describe().show()

# 查看列的类型 ,同pandas
df.dtypes

# 查看列
df.columns

# 列联表
df.crosstab('Age', 'Gender').show()

+----------+---+---+
|Age_Gender| F| M|
+----------+---+---+
| 42| 0| 1|
| 33| 0| 2|
| 45| 0| 1|
| 54| 1| 0|
| 23| 2| 0|
+----------+---+---+

Distinct 查找唯一值

1
2
print(df.select('id').distinct().rdd.map(lambda r: r[0]).collect())
df.select('id').distinct().show()

列操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 选择列
df[['id', 'weight']].show()
df.select('Age').show()
df.select('weight','gender').show()
df.select(['weight','gender']).show()
# 也可以用col指定列
from pyspark.sql.functions import col
df.select(col("weight"), col("gender")).show()

# 改变列类型
df.withColumn("weight", df["weight"].cast("Integer")).show()

# 更新值
df.withColumn("weight", df["weight"]*100).show()

# 创建列
df.withColumn("weight2", df["weight"] * -1).show()

# lit新增一列常量
import pyspark.sql.functions as F
df = df.withColumn('mark', F.lit(1))

# 重命名列
df.withColumnRenamed("gender", "sex").show()

# 对列排序
df.sort("weight", "gender").show()
df.sort(df.weight.asc(), df.gender.desc()).show()
df.sort('weight',ascending=False).show()
# 切片
df.select(df['weight']>140).show() # 这个只会输出满足的行号
df[df['weight']>140].show() # 这个会输出所有的详细结果
df.filter(df.weight.between(150,170)).show() # 这个会输出所有的详细结果

color_df.filter("color='green'").show()
color_df.filter("color like 'b%'").show()
color_df.filter(color_df.length>4).filter(color_df[0]!='white').show()
color_df.where("color like '%yellow%'").show()

# 利用SQL查询,首先注册为临时表,然后执行sql查询
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()

# 删除列
df.drop('length').show()
df.drop('height','age').show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
simpleData = [("James","Sales","NY",90000,34,10000),
("Michael","Sales","NY",86000,56,20000),
("Robert","Sales","CA",81000,30,23000),
("Maria","Finance","CA",90000,24,23000),
("Raman","Finance","CA",99000,40,24000),
("Scott","Finance","NY",83000,36,19000),
("Jen","Finance","NY",79000,53,15000),
("Jeff","Marketing","CA",80000,25,18000),
("Kumar","Marketing","NY",91000,50,21000)
]

schema = ["employee_name","department","state","salary","age","bonus"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
# truncate=True表示只展示前20个字符,默认 truncate=True
df.show(truncate=False)
# 输出前三行
df.show(n=3)

df.groupBy("department").sum("salary").show()
df.groupBy("department","state").sum("salary","bonus").show()

df.groupBy('department').agg(mean('salary').alias("mean salary")).show()

Sql

1
2
3
4
5
6
df.createOrReplaceTempView("table")
df_res1 = spark.sql("SELECT * FROM table")
# 注意A.p1要加这个符号:`A.p1`
df_res2 = spark.sql("SELECT `A.p1` FROM table")

spark.stop()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
df.withColumn("grade",when((df.age < 30), lit("young")).when((df.age >= 30) & (df.age <= 50), lit("middle")).otherwise(lit("old"))).show()

# 这个不用lit的代码,效果也是一样的
df.withColumn("grade",when((df.age < 30), "young").when((df.age >= 30) & (df.age <= 50), "middle").otherwise("old")).show()

+---+------+------+---+------+------+
| id|weight|height|age|gender| grade|
+---+------+------+---+------+------+
| 1| 144.5| 5.9| 33| M|middle|
| 2| 167.2| 5.4| 45| M|middle|
| 3| 124.1| 5.2| 23| F| young|
| 4| 144.5| 5.9| 33| M|middle|
| 5| 133.2| 5.7| 54| F| old|
| 3| 124.1| 5.2| 23| F| young|
| 5| 129.2| 5.3| 42| M|middle|
+---+------+------+---+------+------+
1
2
3
4
5
6
7
spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
from pyspark import SparkContext, SparkConf, SQLContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)
df.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
sdf = sqlContext.sql("SELECT * FROM df1 JOIN df2 on df1.id = df2.id")

UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+

未解决的问题

P1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option","some-value").getOrCreate()
sc = spark.sparkContext
cols = ["A.p1", "B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)
df.show()

# 1. Works
df1 = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`", "`B.p1`"]]))
df1.show()
df2 = df.withColumn('sum1', sum([df[col] for col in ["A.p1", "B.p1"]]))
df2.show()

#
# #2. Doesnt work
# df2 = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))
# df2.show()
#
# #3. Doesnt work
# df3 = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
# df3.show()

spark.stop()
  • 不明白为何列名要用这种形式:[“`A.p1`“, “`B.p1`“],该问题已解决,在df.select里面,不用中括号也可以写成:df.select(‘A.p1’, ‘B.p1’)
  • 不明白为什么2,3不工作
Thanks for rewarding