Spark RDD、DataFrame原理及操作詳解
RDD是什么?
RDD (resilientdistributed dataset),指的(de)是一個只(zhi)讀的(de),可分區的(de)分布(bu)式數據(ju)集(ji),這個數據(ju)集(ji)的(de)全部(bu)或部(bu)分可以緩存在內存中,在多次計算間重用。
RDD內部可(ke)以有許多分區(partitions),每(mei)個分區又擁有大(da)量(liang)的記(ji)錄(lu)(records)。
五(wu)個特征(zheng):
dependencies:建立RDD的依賴(lai)關(guan)系(xi),主要(yao)rdd之間是寬窄依賴(lai)的關(guan)系(xi),具有(you)窄依賴(lai)關(guan)系(xi)的rdd可以在同(tong)一個stage中進行計算。
partition:一(yi)個(ge)(ge)rdd會有若干個(ge)(ge)分(fen)區,分(fen)區的(de)大小(xiao)決定了(le)對這個(ge)(ge)rdd計算(suan)的(de)粒(li)度,每個(ge)(ge)rdd的(de)分(fen)區的(de)計算(suan)都在一(yi)個(ge)(ge)單獨的(de)任(ren)務中進行。
preferedlocations:按照“移(yi)動(dong)數據(ju)不如移(yi)動(dong)計算”原則,在(zai)spark進行任(ren)務(wu)調(diao)度的時候,優(you)先將任(ren)務(wu)分配到(dao)數據(ju)塊存儲的位置
compute:spark中的(de)(de)計算(suan)都(dou)是以分區為基本單(dan)位的(de)(de),compute函數只是對(dui)迭代器進行復合,并不(bu)保存單(dan)次計算(suan)的(de)(de)結果。
partitioner:只存(cun)在(zai)于(K,V)類(lei)型(xing)的(de)rdd中(zhong),非(K,V)類(lei)型(xing)的(de)partitioner的(de)值就(jiu)是None。
rdd的(de)(de)(de)算(suan)子action會觸發(fa)真(zhen)正(zheng)的(de)(de)(de)作業提交(jiao)(jiao),而transformation算(suan)子是(shi)不會立即觸發(fa)作業提交(jiao)(jiao)的(de)(de)(de)。
在Spark中,所(suo)有RDD的(de)(de)(de)(de)轉換都是(shi)是(shi)惰性求(qiu)值的(de)(de)(de)(de)。RDD的(de)(de)(de)(de)轉換操作(zuo)transformation會(hui)生成(cheng)新的(de)(de)(de)(de)RDD,新的(de)(de)(de)(de)RDD的(de)(de)(de)(de)數據依賴于原來(lai)的(de)(de)(de)(de)RDD的(de)(de)(de)(de)數據,每個(ge)(ge)RDD又(you)包(bao)含多個(ge)(ge)分(fen)區。那么(me)一段(duan)程序實(shi)際(ji)上就構造(zao)了(le)一個(ge)(ge)由相互依賴的(de)(de)(de)(de)多個(ge)(ge)RDD組成(cheng)的(de)(de)(de)(de)有向無環圖(DAG)。并通過(guo)在RDD上執(zhi)行action動作(zuo)將這個(ge)(ge)有向無環圖作(zuo)為(wei)一個(ge)(ge)Job提交給Spark執(zhi)行
在DAG中又(you)進行(xing)stage的劃(hua)分(fen)(fen),劃(hua)分(fen)(fen)的依據是(shi)依賴算子是(shi)否是(shi)shuffle(如reduceByKey,Join等)的,每(mei)個stage又(you)可以(yi)劃(hua)分(fen)(fen)成若(ruo)干task。接下來的事情就(jiu)是(shi)driver發(fa)送task到executor,executor自(zi)己的線程池去執(zhi)行(xing)這些task,完成之后(hou)將結(jie)果返回給driver。action算子是(shi)劃(hua)分(fen)(fen)不同(tong)job的依據。
Spark對于有向無環圖Job進(jin)行調度(du),確定(ding)(ding)階段(Stage),分區(Partition),流水線(Pipeline),任務(wu)(Task)和(he)緩(huan)存(Cache),進(jin)行優化,并(bing)在(zai)Spark集群上(shang)運(yun)行Job。RDD之間的依(yi)賴分為(wei)寬依(yi)賴(依(yi)賴多個(ge)分區)和(he)窄依(yi)賴(只依(yi)賴一個(ge)分區),在(zai)確定(ding)(ding)階段時,需(xu)要根(gen)據寬依(yi)賴shuffle劃(hua)分階段。根(gen)據分區劃(hua)分任務(wu)。
Spark支持(chi)故障恢復的方式也(ye)不(bu)同,提(ti)供(gong)兩種方式,Linage,通過數(shu)據的血緣(yuan)關系,再執行一遍前面的處理,Checkpoint,將數(shu)據集存儲到持(chi)久存儲中(zhong)。 Spark為迭代式數(shu)據處理提(ti)供(gong)更好的支持(chi)。每次迭代的數(shu)據可以保存在內(nei)存中(zhong),而(er)不(bu)是寫入(ru)文件
這里注意兩個(ge)算(suan)子coalesce()和repartition()
coalesce
def coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
該(gai)函數(shu)用于將RDD進(jin)行重分(fen)區(qu),使(shi)用HashPartitioner。
第一個(ge)參數(shu)(shu)為(wei)重分區(qu)的數(shu)(shu)目(mu),第二個(ge)為(wei)是(shi)否進行(xing)shuffle,默(mo)認為(wei)false。
repartition
def repartition(numPartitions: Int): RDD[T]
該函(han)數(shu)其實就是(shi)coalesce函(han)數(shu)第二個(ge)參數(shu)為true的實現。
使用(yong)注(zhu)意(yi)
他們兩個(ge)(ge)(ge)都是RDD的(de)分區進行重新(xin)劃分,repartition只(zhi)是coalesce接口中shuffle為true的(de)簡(jian)易實現,(假設RDD有N個(ge)(ge)(ge)分區,需(xu)要重新(xin)劃分成M個(ge)(ge)(ge)分區)
1)N < M。一般情(qing)況(kuang)下N個分(fen)(fen)區有數據分(fen)(fen)布(bu)不均勻(yun)的狀況(kuang),利用HashPartitioner函(han)數將(jiang)(jiang)數據重新分(fen)(fen)區為M個,這時需要將(jiang)(jiang)shuffle設置為true。
2)如果N > M并且N和M相差不(bu)多,(假如N是(shi)1000,M是(shi)100)那么(me)就可(ke)以(yi)將N個(ge)分(fen)區中(zhong)的(de)若干個(ge)分(fen)區合(he)并成一個(ge)新的(de)分(fen)區,最終合(he)并為(wei)M個(ge)分(fen)區,這時可(ke)以(yi)將shuff設置為(wei)false,在(zai)shuffl為(wei)false的(de)情況下,如果M>N時,coalesce為(wei)無(wu)效的(de),不(bu)進行shuffle過(guo)程,父(fu)RDD和子RDD之間是(shi)窄依賴關系(xi)。
3)如果N > M并且兩者相差懸殊,這時(shi)如果將shuffle設(she)置為(wei)false,父子RDD是窄依賴關(guan)系,他們同處在一個stage中,就可(ke)能(neng)造成Spark程序(xu)的并行度不(bu)夠,從而影響性能(neng),如果在M為(wei)1的時(shi)候,為(wei)了使(shi)coalesce之前的操(cao)作(zuo)有更好的并行度,可(ke)以講shuffle設(she)置為(wei)true。
總之:如果shuff為false時,如果傳入的參數大于現有的分區數目,RDD的分區數不變,也就是說不經過shuffle,是無法將RDDde分區數變多的
參考:
更多RDD算子內容推(tui)薦(jian)參考(kao)
Spark常用函數講解之鍵值RDD轉換
Spark常用函數講解之Action操作
窄依賴(lai)和寬(kuan)依賴(lai)
shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執行速度的關鍵步驟.
RDD 的 Transformation 函數中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的區別是是否發生 shuffle(洗牌) 操作.寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴于其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴于父RDD 的多個分片,所以會造成父 RDD 的各個分片在集群中重新分片。
如(ru)下圖(tu)所示:map就是一種(zhong)窄依賴,而(er)join則會導致(zhi)寬依賴

如上面的(de)map,filter,union屬(shu)于第一類(lei)窄依賴,而join with inputs co-partitioned(對輸入進(jin)行協同劃分(fen)(fen)的(de)join操作(zuo),也就是說先按照key分(fen)(fen)組然(ran)后shuffle write的(de)時候(hou)一個父分(fen)(fen)區對應一個子分(fen)(fen)區)則(ze)為第二類(lei)窄依賴
groupByKey和對輸入未協同劃分的join操作就是寬依賴,這是shuffle類操作。
細(xi)說:
首先,窄(zhai)依賴允(yun)許在單(dan)個集群節(jie)點上流水線式執行,這個節(jie)點可以計算(suan)所(suo)有父級(ji)分區。例如,可以逐個元素(su)地依次執行filter操作和map操作。相反,寬(kuan)依賴需要(yao)所(suo)有的(de)父RDD數(shu)(shu)據(ju)(ju)可用并且數(shu)(shu)據(ju)(ju)已經通過類MapReduce的(de)操作shuffle完成。
其次,在(zai)窄依賴(lai)中,節點失敗(bai)后的(de)恢復(fu)更(geng)加高效。因(yin)為只有丟失的(de)父(fu)級分區(qu)需要(yao)重(zhong)新計算,并(bing)且(qie)這些(xie)(xie)丟失的(de)父(fu)級分區(qu)可以并(bing)行地在(zai)不同節點上(shang)重(zhong)新計算。與此相反,在(zai)寬依賴(lai)的(de)繼承關系(xi)中,單個(ge)失敗(bai)的(de)節點可能(neng)導(dao)致一個(ge)RDD的(de)所有先祖RDD中的(de)一些(xie)(xie)分區(qu)丟失,導(dao)致計算的(de)重(zhong)新執行。
// Map: "cat" -> c, cat val rdd1 = rdd.Map(x => (x.charAt(0), x)) // groupby same key and count val rdd2 = rdd1.groupBy(x => x._1). Map(x => (x._1, x._2.toList.length))
第一個 Map 操作將 RDD 里的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,可以在集群的各個內存中獨立計算,也就是并行化,第二個 groupby 之后的 Map 操作,為了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數據在內存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗時的操作,應盡量避免不必要的 shuffle.
根據是否發生(sheng) shuffle 操(cao)作能(neng)夠將其分成(cheng)如下的(de) stage 類(lei)型

(join 需要針對同一個 key 合并,所以需要 shuffle)
運行到每個 stage 的邊界時,數據在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中通過網絡從上一個 Task 中去讀取數據。這些操作會導致很嚴重的網絡傳輸以及磁盤的I/O,所以 stage 的邊界是非常占資源的,在編寫 Spark 程序的時候需要盡量避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,所以那些產生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的數據將被切分為多少個 partition。 PS:shuffle 操作的時候可以用 combiner 壓縮數據,減少 IO 的消耗
參考:

一(yi):DataFrame創建
SparkSQL可以(yi)以(yi)其(qi)他RDD對象、parquet文(wen)件(jian)、json文(wen)件(jian)、hive表,以(yi)及通過JDBC連接到(dao)其(qi)他關系型數據庫(ku)作為(wei)數據源來生成DataFrame對象。
1)jdbc
【讀(du)】
postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb"
dimDF = sqlContext.read.format('jdbc').options(url=postgresUrl,dbtable=tableName,user="root",password="root")
.load()
dimDF.registerTempTable(tmpTableName)
【寫】
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB)
self.postgresqlDatasource = {
"url" : "jdbc:postgresql://" + self.postgresURL,
"user" : self.postgresUser,
"password" : self.postgresPwd
}
resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"], table=reportTable, mode='append', properties=postgresqlDatasource)
2)parquet
【讀(du)】
telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStr
if( common.fileExist(telematicFilePath, self.sc) ):
df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquet TELEMATIC_PARQUET_SCHEMA = SQLType.StructType([
SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True), SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True), SQLType.StructField('deviceId', SQLType.StringType(), True), SQLType.StructField('companyId', SQLType.StringType(), True)])
【寫】
df.write.parquet(parquetPath, mode="overwrite")
3)json
df = sqlContext.read.json(path)
4)list列(lie)表
dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)
5)Rdd
if rddSchema is None: df = sqlContext.createDataFrame(rdd) else: df = sqlContext.createDataFrame(rdd, rddSchema)
rdd = sc.parallelize(resultList)
df = self.sqlContext.createDataFrame(rdd)
二:Transform操作
三:Action操作
1、 collect() ,返回一個數組,包括dataframe集合所有的行
df = sqlContext.createDataFrame(parquetRecordList, PARQUET_FILE_SCHEMA)
for key in df.rdd.map(lambda x: x["key"]).distinct().collect():
filePath = "/user/spark/test.parquet/key=20171110"
df.filter("key="+str(key)).drop("key").write.parquet(filePath, mode="append")
2、 collectAsList() 返回值是一個java類型的數組,返回dataframe集合所有的行
3、 count() 返回一個number類型的,返回dataframe集合的行數
4、 toJson
5、 first() 返回第一行 ,類型是row類型
6、 head() 返回第一行 ,類型是row類型
7、 head(n:Int)返回n行 ,類型是row 類型
8、 show()返回dataframe集合的值 默認是20行,返回類型是unit
9、 show(n:Int)返回n行,,返回值類型是unit
10、table(n:Int) 返回n行 ,類型是row 類型
dataframe的基本操作
1、 cache()同步數據的內存
data = self.sqlContext.sql(queryStr).toJSON().cache().collect()
2、 columns 返回一個string類型的數組,返回值是所有列的名字
3、 dtypes返回一個string類型的二維數組,返回值是所有列的名字以及類型
4、 explan()打印執行計劃 物理的
5、 toJSON 轉換為json格式數據
6、 isLocal 返回值是Boolean類型,如果允許模式是local返回true 否則返回false
7、 persist(newlevel:StorageLevel) 返回一個dataframe.this.type 輸入存儲模型類型
稍后詳解
8、 printSchema() 打印出字段名稱和類型 按照樹狀結構來打印
9、 registerTempTable(tablename:String) 返回Unit ,將df的對象只放在一張表里面,這個表隨著對象的刪除而刪除了
10、 schema 返回structType 類型,將字段名稱和類型按照結構體類型返回
11、 toDF()返回一個新的dataframe類型的
12、 toDF(colnames:String*)將參數中的幾個字段返回一個新的dataframe類型的,
13、 unpersist() 返回dataframe.this.type 類型,去除模式中的數據
14、 unpersist(blocking:Boolean)返回dataframe.this.type類型 true 和unpersist是一樣的作用false 是去除RDD
集成查詢:
1、 agg(expers:column*) 返回dataframe類型 ,按每個device分組查最小時間
df = sqlContext.createDataFrame(tensRdd) resultDF = df.groupBy("device_id").agg({RegularDataEtlConstants.TIME: 'min'}) resultDF.repartition(self._partitionNum).foreachPartition(lambda iterator: self.__saveToHBase(iterator))
startTime = df.filter((df.startTime != "") & (df.startTime >= minStartTimeCurrent)).agg({"startTime": "min"}).collect()[0][0]
4、 apply(colName: String) 返回column類型,捕獲輸入進去列的對象
5、 as(alias: String) 返回一個新的dataframe類型,就是原來的一個別名
6、 col(colName: String) 返回column類型,捕獲輸入進去列的對象
7、 cube(col1: String, cols: String*) 返回一個GroupedData類型,根據某些字段來匯總
8、 distinct 去重 返回一個dataframe類型
9、 drop(col: Column) 刪除某列 返回dataframe類型
columnList = ['key', 'type', 'timestamp', 'data'] df = sqlContext.createDataFrame(dataList[index], columnList) for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): parquetPath = parquetList[index] + "/key=" + str(key) df.filter("key="+str(key)).drop("key").write.parquet(parquetPath, mode="append", partitionBy="type")
10、 dropDuplicates(colNames: Array[String]) 刪除相同的列 返回一個dataframe
11、 except(other: DataFrame) 返回一個dataframe,返回在當前集合存在的在其他集合不存在的
12、 explode[A, B](inputColumn: String, outputColumn: String)行轉列
根據c3字(zi)段中(zhong)的(de)空格將字(zi)段內(nei)容進行(xing)分割,分割的(de)內(nei)容存儲在新的(de)字(zi)段c3_中(zhong) jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

13、 filter(conditionExpr: String): 刷選部分數據,返回dataframe類型
df.filter("age>10").show(); df.filter(df("age")>10).show(); df.where(df("age")>10).show();
14、 groupBy(col1: String, cols: String*) 分(fen)組
dfgroupBy("age").avg().show();
15、 intersect(other: DataFrame) 返回一個dataframe,在2個dataframe都存在的元素
16、 join(right: DataFrame, joinExprs: Column, joinType: String)
一個是關聯的dataframe,第二個關聯的條件,第三個關聯的類型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and df("age")===ds("age"),"outer").show();
17、 limit(n: Int) 返回dataframe類型 去n 條數據出來
18、 na: DataFrameNaFunctions ,可以調用dataframenafunctions的功能區做過濾 df.na.drop().show(); 刪除為空的行
19、 orderBy(sortExprs: Column*) 做alise排序
20、 select(cols:string*) dataframe 做字段的刷選 df.select($"colA", $"colB" + 1)
21、 selectExpr(exprs: String*) 做字段的刷選 df.selectExpr("name","name as names","upper(name)","age+1").show();
22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默認是asc
23、 unionAll(other:Dataframe) 合并
df = df.unionAll(dfTemp).coalesce(int(self.partitionNum))
24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
25、 withColumn(colName: String, col: Column) 增加一列
往df中新增一個名為aa的列,值與列name的一樣
df.withColumn("aa",df("name")).show();
將該列時間值計算加上時區偏移值
mergeDF = mergeDF.withColumn("dm_transct_date_hr_key", functions.lit(self.__datehandle(mergeDF["dm_transct_date_hr_key"], self.timezoneOffset)))
//blog.csdn.net/mtj66/article/details/52064827
