更新時(shí)間:2023年07月28日16時(shí)16分 來(lái)源:傳智教育 瀏覽次數(shù):
DataFrame對(duì)象可以從RDD轉(zhuǎn)換而來(lái),都是分布式數(shù)據(jù)集 其實(shí)就是轉(zhuǎn)換一下內(nèi)部存儲(chǔ)的結(jié)構(gòu),轉(zhuǎn)換為二維表結(jié)構(gòu)。
將RDD轉(zhuǎn)換為DataFrame方式1:
調(diào)用spark
# 首先構(gòu)建一個(gè)RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做類型轉(zhuǎn)換, 因?yàn)轭愋蛷腞DD中探測(cè)
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
通過(guò)SparkSession對(duì)象的createDataFrame方法來(lái)將RDD轉(zhuǎn)換為DataFrame,這里只傳入列名稱,類型從RDD中進(jìn)行推斷,是否允許為空默認(rèn)為允許(True)。
# coding:utf8
# 演示DataFrame創(chuàng)建的三種方式
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("create df").\
master("local[*]").\
getOrCreate()
sc = spark.sparkContext
# 首先構(gòu)建一個(gè)RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做類型轉(zhuǎn)換, 因?yàn)轭愋蛷腞DD中探測(cè)
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
# 打印表結(jié)構(gòu)
df.printSchema()
# 打印20行數(shù)據(jù)
df.show()
df.createTempView("ttt")
spark.sql("select * from ttt where age< 30").show()
將RDD轉(zhuǎn)換為DataFrame方式2:
通過(guò)StructType對(duì)象來(lái)定義DataFrame的“表結(jié)構(gòu)”轉(zhuǎn)換RDD
# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 類
# 這個(gè)類 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add, 每一個(gè)add代表一個(gè)StructField
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
df = spark.createDataFrame(rdd, schema)
# coding:utf8
# 需求: 基于StructType的方式構(gòu)建DataFrame 同樣是RDD轉(zhuǎn)DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("create_df"). \
config("spark.sql.shuffle.partitions", "4"). \
getOrCreate()
# SparkSession對(duì)象也可以獲取 SparkContext
sc = spark.sparkContext
# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 類
# 這個(gè)類 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show()
將RDD轉(zhuǎn)換為DataFrame方式3:
使用RDD的toDF方法轉(zhuǎn)換RDD
# StructType 類
# 這個(gè)類 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
# 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 傳入完整的Schema描述對(duì)象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
# coding:utf8
# 需求: 使用toDF方法將RDD轉(zhuǎn)換為DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
spark = SparkSession.builder.\
appName("create_df"). \
config("spark.sql.shuffle.partitions", "4"). \
getOrCreate()
# SparkSession對(duì)象也可以獲取 SparkContext
sc = spark.sparkContext
# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
map(lambda x:x.split(',')).\
map(lambda x:(int(x[0]), x[1], int(x[2])))
# StructType 類
# 這個(gè)類 可以定義整個(gè)DataFrame中的Schema
schema = StructType().\
add("id", IntegerType(), nullable=False).\
add("name", StringType(), nullable=True).\
add("score", IntegerType(), nullable=False)
# 一個(gè)add方法 定義一個(gè)列的信息, 如果有3個(gè)列, 就寫(xiě)三個(gè)add
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
# 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()
# 方式2: 傳入完整的Schema描述對(duì)象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()