更新時(shí)間:2021年03月23日17時(shí)09分 來(lái)源:傳智教育 瀏覽次數(shù):
在Windows系統(tǒng)下開發(fā)Scala代碼,可以使用本地環(huán)境測(cè)試,因此我們首先需要在本地磁盤準(zhǔn)備文本數(shù)據(jù)文件,這里將HDFS中的/spark/person.txt文件下載到本地D:/spark/person.txt路徑下。從文件4-1可以看出,當(dāng)前數(shù)據(jù)文件共3列,我們可以非常容易的分析出,這三列分別是編號(hào)、姓名、年齡。但是計(jì)算機(jī)無(wú)法像人一樣直觀的感受字段的實(shí)際含義,因此我們需要通過(guò)反射機(jī)制來(lái)推斷包含特定類型對(duì)象的Schema信息。
接下來(lái)我們打開IDEA開發(fā)工具,創(chuàng)建名為“spark_chapter04”的Maven工程,講解實(shí)現(xiàn)反射機(jī)制推斷Schema的開發(fā)流程。
1.添加Spark SQL依賴
在pom.xml文件中添加Spark SQL依賴,代碼片段如下所示。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
2.編寫代碼
實(shí)現(xiàn)反射機(jī)制推斷Schema需要定義一個(gè)case class樣例類,定義字段和屬性,樣例類的參數(shù)名稱會(huì)被利用反射機(jī)制作為列名,編寫代碼如文件1所示。
文件1 CaseClassSchema.scala
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} //定義樣例類 case class Person(id:Int,name:String,age:Int) object CaseClassSchema { def main(args: Array[String]): Unit = { //1.構(gòu)建SparkSession val spark : SparkSession = SparkSession.builder() .appName("CaseClassSchema ") .master("local[2]") .getOrCreate() //2.獲取SparkContext val sc : SparkContext =spark.sparkContext //設(shè)置日志打印級(jí)別 sc.setLogLevel("WARN") //3.讀取文件 val data: RDD[Array[String]] = sc.textFile("D://spark//person.txt").map(x=>x.split(" ")) //4.將RDD與樣例類關(guān)聯(lián) val personRdd: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt)) //5.獲取DataFrame //手動(dòng)導(dǎo)入隱式轉(zhuǎn)換 import spark.implicits._ val personDF: DataFrame = personRdd.toDF //------------DSL風(fēng)格操作開始------------- //1.顯示DataFrame的數(shù)據(jù),默認(rèn)顯示20行 personDF.show() //2.顯示DataFrame的schema信息 personDF.printSchema() //3.統(tǒng)計(jì)DataFrame中年齡大于30的人數(shù) println(personDF.filter($"age">30).count()) //-----------DSL風(fēng)格操作結(jié)束------------- //-----------SQL風(fēng)格操作開始------------- //將DataFrame注冊(cè)成表 personDF.createOrReplaceTempView("t_person") spark.sql("select * from t_person").show() spark.sql("select * from t_person where name='zhangsan'").show() //-----------SQL風(fēng)格操作結(jié)束------------- //關(guān)閉資源操作 sc.stop() spark.stop() }
Scala的方法和函數(shù)介紹【大數(shù)據(jù)文章】
Sequence File是什么?簡(jiǎn)單介紹幾種文件儲(chǔ)存格式
北京校區(qū)