教育行業(yè)A股IPO第一股(股票代碼 003032)

全國咨詢/投訴熱線:400-618-4000

如何基于RDD方式完成DataFrame的代碼構(gòu)建?

更新時間:2023年07月28日16時16分 來源:傳智教育 瀏覽次數(shù):

DataFrame對象可以從RDD轉(zhuǎn)換而來,都是分布式數(shù)據(jù)集 其實(shí)就是轉(zhuǎn)換一下內(nèi)部存儲的結(jié)構(gòu),轉(zhuǎn)換為二維表結(jié)構(gòu)。

將RDD轉(zhuǎn)換為DataFrame方式1:

調(diào)用spark

# 首先構(gòu)建一個RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
  map(lambda x: x.split(',')).\
  map(lambda x: [x[0], int(x[1])])               # 需要做類型轉(zhuǎn)換, 因?yàn)轭愋蛷腞DD中探測
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])

通過SparkSession對象的createDataFrame方法來將RDD轉(zhuǎn)換為DataFrame,這里只傳入列名稱,類型從RDD中進(jìn)行推斷,是否允許為空默認(rèn)為允許(True)。

# coding:utf8
# 演示DataFrame創(chuàng)建的三種方式
from pyspark.sql import SparkSession
if __name__ == '__main__':
    spark = SparkSession.builder.\
       appName("create df").\
master("local[*]").\
getOrCreate()

sc = spark.sparkContext
# 首先構(gòu)建一個RDD rdd[(name, age), ()]
rdd = sc.textFile("../data/sql/people.txt").\
map(lambda x: x.split(',')).\
map(lambda x: [x[0], int(x[1])]) # 需要做類型轉(zhuǎn)換, 因?yàn)轭愋蛷腞DD中探測
# 構(gòu)建DF方式1
df = spark.createDataFrame(rdd, schema = ['name', 'age'])
# 打印表結(jié)構(gòu)
df.printSchema()
# 打印20行數(shù)據(jù)
df.show()
df.createTempView("ttt")
spark.sql("select * from ttt where age< 30").show()
將RDD轉(zhuǎn)換為DataFrame方式2:

通過StructType對象來定義DataFrame的“表結(jié)構(gòu)”轉(zhuǎn)換RDD

# 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
rdd = sc.textFile("../data/sql/stu_score.txt").\
  map(lambda x:x.split(',')).\
  map(lambda x:(int(x[0]), x[1], int(x[2])))

# StructType 類
# 這個類 可以定義整個DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add, 每一個add代表一個StructField
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
df = spark.createDataFrame(rdd, schema)
# coding:utf8
# 需求: 基于StructType的方式構(gòu)建DataFrame 同樣是RDD轉(zhuǎn)DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
  spark = SparkSession.builder.\
    appName("create_df"). \
    config("spark.sql.shuffle.partitions", "4"). \
    getOrCreate()
  # SparkSession對象也可以獲取 SparkContext
  sc = spark.sparkContext
  # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
  rdd = sc.textFile("../data/sql/stu_score.txt").\
    map(lambda x:x.split(',')).\
    map(lambda x:(int(x[0]), x[1], int(x[2])))
  # StructType 類
  # 這個類 可以定義整個DataFrame中的Schema
  schema = StructType().\
    add("id", IntegerType(), nullable=False).\
    add("name", StringType(), nullable=True).\
    add("score", IntegerType(), nullable=False)
  # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add
  # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
  df = spark.createDataFrame(rdd, schema)
  df.printSchema()
  df.show()

將RDD轉(zhuǎn)換為DataFrame方式3:

使用RDD的toDF方法轉(zhuǎn)換RDD

# StructType 類
# 這個類 可以定義整個DataFrame中的Schema
schema = StructType().\
  add("id", IntegerType(), nullable=False).\
  add("name", StringType(), nullable=True).\
  add("score", IntegerType(), nullable=False)
# 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add
# add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空

# 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true
df = rdd.toDF(['id', 'subject', 'score'])
df.printSchema()
df.show()

# 方式2: 傳入完整的Schema描述對象StructType
df = rdd.toDF(schema)
df.printSchema()
df.show()
# coding:utf8
# 需求: 使用toDF方法將RDD轉(zhuǎn)換為DF
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
if __name__ == '__main__':
    spark = SparkSession.builder.\
      appName("create_df"). \
      config("spark.sql.shuffle.partitions", "4"). \
      getOrCreate()
    # SparkSession對象也可以獲取 SparkContext
    sc = spark.sparkContext
    # 創(chuàng)建DF , 首先創(chuàng)建RDD 將RDD轉(zhuǎn)DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    # StructType 類
    # 這個類 可以定義整個DataFrame中的Schema
    schema = StructType().\
       add("id", IntegerType(), nullable=False).\
       add("name", StringType(), nullable=True).\
       add("score", IntegerType(), nullable=False)
    # 一個add方法 定義一個列的信息, 如果有3個列, 就寫三個add
    # add方法: 參數(shù)1: 列名稱, 參數(shù)2: 列類型, 參數(shù)3: 是否允許為空
    # 方式1: 只傳列名, 類型靠推斷, 是否允許為空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    # 方式2: 傳入完整的Schema描述對象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()

0 分享到:
和我們在線交談!