胡志寶 陸會明
(華北電力大學控制與計算機工程學院,北京102206)
工業(yè)生產(chǎn)過程中,我們希望所有生產(chǎn)環(huán)節(jié)都能夠安全高效進行,但是實際生產(chǎn)中,某些物理量可能會出現(xiàn)較大波動,使得生產(chǎn)過程不夠穩(wěn)定甚至發(fā)生危險,所以在工業(yè)生產(chǎn)過程中,我們會對某些環(huán)節(jié)進行監(jiān)測記錄,以便在出現(xiàn)特殊情況時及時采取措施來保證生產(chǎn)過程的安全。當前傳感器、多媒體、數(shù)據(jù)庫和無線網(wǎng)絡技術在工業(yè)生產(chǎn)中得到了廣泛應用,由此開發(fā)了工業(yè)生產(chǎn)監(jiān)控系統(tǒng)、工業(yè)生產(chǎn)流程處理系統(tǒng)等,這些系統(tǒng)在運行中會積累海量的數(shù)據(jù)。通過對這些數(shù)據(jù)進行挖掘,我們可以知道生產(chǎn)過程是否穩(wěn)定,哪些量容易超過限制,哪些環(huán)節(jié)需要重點關注,找出生產(chǎn)過程的薄弱點對方案進行改進。由此可見,數(shù)據(jù)的挖掘分析可以提高工業(yè)生產(chǎn)決策的準確度,進一步改進工業(yè)生產(chǎn)效率[1]。
Spark[2]是專為大數(shù)據(jù)處理而設計的快速通用的計算引擎,是一個基于內(nèi)存計算的可擴展的開源集群計算系統(tǒng),解決了MapReduce[3]在大量的網(wǎng)絡傳輸和磁盤I/O 時效率低的問題。Spark 具有運行速度快、易用性、通用性、兼容性強的四大特點[4]。Spark SQL[5]是Spark 用來處理結構化數(shù)據(jù)的一個模塊,它提供了一個編程抽象叫做DataFrame,并且具有作為分布式SQL查詢引擎的作用,其使用類SQL 的語法作為高層的數(shù)據(jù)操縱API,極大地降低了數(shù)據(jù)分析工作的難度。
Spark SQL 是Spark 框架的重要組成部分,主要用于結構化數(shù)據(jù)處理和對Spark 數(shù)據(jù)執(zhí)行類SQL 的查詢,其數(shù)據(jù)處理功能可以從以下三個過程來體現(xiàn):
2.1.1 抽 ?。‥xtract):Spark SQL 可 以 從 多 種 文 件 系 統(tǒng)(HDFS[6]、S3. 本地文件系統(tǒng)等)、關系型數(shù)據(jù)庫(MySQL、Oracle等)或NoSQL 數(shù)據(jù)庫(Cassandra、HBase[7]、Druid 等)中獲取數(shù)據(jù),Spark SQL 支持的文件類型可以是CSV、JSON、XML 等。
2.1.2 轉換(Transform):在數(shù)據(jù)清洗方面,比如空值處理、拆分數(shù)據(jù)、規(guī)范化數(shù)據(jù)格式、數(shù)據(jù)替換等,Spark SQL 能夠準確高效地完成這類轉換操作。
2.1.3 加載(Load):在將數(shù)據(jù)處理完成之后,Spark SQL 還可以將數(shù)據(jù)存儲到各種數(shù)據(jù)源中。
除了上述功能,SparkSQL 還可以和Spark 的其他模塊搭配使用,完成各種各樣更為復雜的工作,比如和SparkStreaming[8]搭配處理實時的數(shù)據(jù)流,和MLlib[9]搭配完成一些機器學習的應用[10]。Spark SQL 無縫地將SQL 查詢與Spark 程序混合,允許開發(fā)人員將結構化數(shù)據(jù)作為Spark 中的分布式數(shù)據(jù)集進行查詢,在Python,Scala 和Java 中集成了API,使得開發(fā)人員可以輕松地運行SQL 查詢以及復雜的分析算法。
Spark SQL 對SQL 語句的處理過程和關系型數(shù)據(jù)庫是類似的,會先對SQL 語句進行解析形成Tree,然后使用Rule 對Tree進行綁定、優(yōu)化等處理,通過模式匹配對不同類型的節(jié)點采用不同的操作。Spark SQL 模塊劃分為Core、Catalyst、Hive[11]和Hive-ThriftServer 四大模塊,每個模塊的作用如下:
2.2.1 Core:負責處理數(shù)據(jù)的輸入/輸出,從不同的數(shù)據(jù)源獲取數(shù)據(jù),然后將查詢結果輸出成DataFrame。
2.2.2 Catalyst:負責處理查詢語句的整個過程,包括解析、綁定、優(yōu)化、物理計劃等,是Spark SQL 最為核心的部分,其性能優(yōu)劣將決定整體的性能。
2.2.3 Hive:負責對Hive 數(shù)據(jù)的處理。
2.2.4 Hive-ThriftServer:提供CLI 和JDBC/ODBC 接口等。
圖1 Spark SQL 執(zhí)行過程
圖2 讀取的數(shù)據(jù)
這四個模塊共同工作,完成數(shù)據(jù)的獲取和SQL 語句的執(zhí)行任務,圖1 所示即為Spark SQL 執(zhí)行SQL 語句的過程。
對于得到的工業(yè)生產(chǎn)中的數(shù)據(jù),通過對數(shù)據(jù)進行處理挖掘,得到我們需要的統(tǒng)計量,常見的統(tǒng)計量有均值、均方差、極值等。超限統(tǒng)計也是非常重要的一項統(tǒng)計內(nèi)容,包括統(tǒng)計各個超限時間段及超限的持續(xù)時間。
本文采用的是Java+ Spark SQL 的方式來實現(xiàn)數(shù)據(jù)的讀取與處理,數(shù)據(jù)讀取需要先創(chuàng)建SparkSession 服務,然后使用SparkSession 服務讀取數(shù)據(jù)。數(shù)據(jù)的第一列是日期列,讀取時將日期轉化為時間戳的形式,單位是毫秒。數(shù)據(jù)讀取的代碼如下:
圖2 為數(shù)據(jù)讀取結果。
timestamp 是時間列,對應著獲得每個數(shù)據(jù)的采樣時間點,其余列是在采樣時間點各個過程量的值。
3.2.1 常規(guī)統(tǒng)計量
常規(guī)統(tǒng)計量包括均值、均方差、最大值、最小值。通過均值和均方差我們可以知道過程量是否穩(wěn)定在我們設置的輸入附近,生產(chǎn)過程是否穩(wěn)定。這些統(tǒng)計量可以直接調(diào)用API 函數(shù)即可實現(xiàn),過程如圖3。
圖3 常規(guī)統(tǒng)計流程
獲取常規(guī)統(tǒng)計量時與時間列無關且對時間列進行常規(guī)統(tǒng)計并沒有實際意義,所以先去掉時間列,然后對剩余列進行常規(guī)統(tǒng)計,實現(xiàn)的代碼如下,
data = data.drop("timestamp");
data = data.describe()。
3.2.2 超限統(tǒng)計
生產(chǎn)過程中一些過程量會有對應的上限值,過程量超過上限值時系統(tǒng)就會報警,就需要進行調(diào)節(jié)使過程量回歸正常值,一旦超限時間持續(xù)太長就會出現(xiàn)危險。超限統(tǒng)計就是對這種情況進行統(tǒng)計,統(tǒng)計超過限定值的持續(xù)時間以及最大超限時長。實際生產(chǎn)過程中會設置死區(qū)值,即過程量的值從超限狀態(tài)回歸正常值時,如果僅是下降穿過設定的上限值并不算回歸正常,還需要繼續(xù)下降一定的量才算回歸正常,繼續(xù)下降的這部分為死區(qū)。
超限統(tǒng)計牽扯到了時間段的統(tǒng)計,難點在于如何確定每一段超限時間段的起始時間點和終止時間點,只要得到起始時間點和終止時間點就可以計算出持續(xù)時長??梢韵炔豢紤]死區(qū)的存在,僅考慮上限值,找出所有超過上限值的數(shù)據(jù),然后再判斷這些數(shù)據(jù)相鄰行之間對應的時間點在時間上是否以采樣周期等距連續(xù)的,從而找出每一段連續(xù)的超限數(shù)據(jù),記錄下來起始時間點。然后將上限值減去死區(qū)值得到的值作為新的上限值,以相同方法找出每一段超限數(shù)據(jù),記錄下來結束時間點。將兩次記錄下來的時間點進行比較,可以得到考慮死區(qū)時每一段超限時間的起始時間點和結束時間點。解決步驟如下:
(1)將統(tǒng)計的數(shù)據(jù)與上限值進行比較,取出所有超過上限值的數(shù)據(jù)。
(2)判斷這些數(shù)據(jù)對應的時間點是否以采樣周期等距連續(xù),找出連續(xù)的各個時間段。
(3)記錄找出的各個時間段的起始時間點,用列表list1 保存下來。
(4)上限值減去死區(qū)值作為新的上限值,重復(1)、(2)得到新的連續(xù)時間段,然后判斷得到的各個時間段內(nèi)的數(shù)據(jù)是否存在超限數(shù)據(jù),保留存在超限數(shù)據(jù)的時間段,記錄保留的時間段的結束時間點,用列表list2 保存下來。
(5)將兩個列表中的元素進行比較,確定在考慮死區(qū)時的每一段超限時間段的起始時間點列表和終止時間點列表,從而得到超限時間段。
得到超限時間段之后,即可計算出每段超限時間段的持續(xù)時長,通過比較得到最大時長,通過求和求出總的超限時長。超限統(tǒng)計流程如圖4 所示。
圖5 常規(guī)統(tǒng)計結果
圖4 超限統(tǒng)計流程
超限統(tǒng)計的代碼主要包括列表list1 和列表list2 的獲取,列表的獲取分別編寫了一個函數(shù)來實現(xiàn)。
(1)獲取list1 的函數(shù)及其參數(shù)如下:
public List<List<Long>>limit(Dataset data, Float set, String tag, Long T, SparkSession spark);
函數(shù)的參數(shù)依次分別是被統(tǒng)計的數(shù)據(jù)、設定值、被統(tǒng)計的列的名稱、采樣周期、SparkSession 服務。返回值為步驟(2)中各個時間段的開始時間點組成的列表和結束時間點組成的列表,list1 為開始時間點組成的列表。函數(shù)的主要代碼語句與說明如下,
data = spark.sql (“select timestamp from tempTable where ”+tag+“>”+set+“”); //取出超過設定值的數(shù)據(jù)
data = spark.sql(“select *, timestamp - lag(timestamp,1)over(order by timestamp) as diff from tempTable”); //將取出來的數(shù)據(jù)對應的采樣時間點相鄰行做差
data = spark.sql(“select timestamp,(case when diff = ”+T+“then 1 else 0 end) as status from tempTable”); //若上一步求得的差值等于采樣周期,則標記該行狀態(tài)為否則為0,生成狀態(tài)列data = spark.sql(“select timestamp,status,lead(status)over(order by timestamp) as lead from tempTable”); //將狀態(tài)列向前移動一行得到新的狀態(tài)列
根據(jù)兩列狀態(tài)列找出各個連續(xù)時間段的開始時間點和結束時間點
Dataset starttime = data.select(“timestamp”).filter(“status = 0 and lead = 1”);//開始時間表
Dataset endtime = data.select(“timestamp”).filter(“status = 1 and lead = 0”);//結束時間表
獲得的開始時間表和結束時間表是dataframe,還需要將其轉換為列表,開始時間表轉化的列表即為list1。
(2)獲取list2 的函數(shù)及其參數(shù)說明如下:
public List<Long>death(Dataset data, Float set, Float value,String tag, Long T, SparkSession spark)
函數(shù)的參數(shù)依次分別是被統(tǒng)計的數(shù)據(jù)、設定值、設定值減去死區(qū)值得到的數(shù)值,被統(tǒng)計的列的名稱、采樣周期、SparkSession服務。返回值分別為步驟(3)中保留的各個時間段的結束時間點組成的列表,即為list2 的值。函數(shù)主要代碼語句與說明如下:
List<Long>s = limit(data,value,tag,T,spark).get(0); //調(diào)用limit函數(shù)求出各個連續(xù)時間段的開始時間點
List<Long>e = limit (data,value,tag,T,spark).get (1); //調(diào)用limit 函數(shù)求出各個連續(xù)時間段的開始時間點
確定各個時間段是否存在超限數(shù)據(jù),保留存在超限數(shù)據(jù)的時間段的結束時間點的集合作為返回值,即為list2 的值。
要想求出考慮死區(qū)情況的超限時間段,還需要將已經(jīng)求出的list1 和list2 進行比較。list2 中的元素就是最終要求的超限時間段的結束時間點,然后將list1 中的元素與list2 中的元素依次比較來確定超限時間段的開始時間點。在得到各個超限時間段的開始和結束時間點之后,將列表轉換為DataFrame,開始列與結束列做差得到持續(xù)時長。
常規(guī)統(tǒng)計的結果如圖5 所示,超限統(tǒng)計選擇的是DOMAIN1:UNITMW 列,設定值為520,死區(qū)值為1,統(tǒng)計結果如圖6 所示。
圖6 超限時間段及其持續(xù)時長
本文利用大數(shù)據(jù)處理技術Spark SQL 對數(shù)據(jù)進行統(tǒng)計,不僅統(tǒng)計了常用的一些統(tǒng)計量如均值、最值、均方差等,還進行了超限統(tǒng)計,有助于我們了解統(tǒng)計的數(shù)據(jù)所在的時間段內(nèi)工業(yè)生產(chǎn)的狀態(tài)是否穩(wěn)定,有沒有按照預期進行工作。對于得到的工業(yè)生產(chǎn)的數(shù)據(jù),我們還可以利用這些數(shù)據(jù)進行建模,然后對模型進行優(yōu)化、提高生產(chǎn)效率。同時Spark SQL 技術在數(shù)據(jù)處理方面有著很大的優(yōu)勢,計算速度快、通用性高,還可以用于數(shù)據(jù)管理查詢系統(tǒng)。