wso2~event-flow的介紹(shao)
我們來詳細分析和介紹一下 WSO2 平臺(特別是 WSO2 Stream Processor / WSO2 SI / Siddhi Runner)中 Event Receivers、Event Streams、Execution Plans 和(he) Event Publishers 這些核心(xin)概(gai)念。它(ta)們(men)是構建實時流處(chu)理(li)和(he)復雜(za)事件(jian)處(chu)理(li)(CEP)應用的基礎組件(jian)。
核心思想:事件驅動架構 (EDA)
這(zhe)些(xie)組件共同構成了一個事件處理管道(Event Processing Pipeline),其(qi)核心流程是:
- 接收事件: 從外部來源獲取原始事件數據 (
Event Receivers)。 - 定義結構: 為事件數據定義一個明確的格式和含義 (
Event Streams)。 - 處理邏輯: 對事件流進行過濾、轉換、關聯、聚合、模式匹配等計算 (
Execution Plans)。 - 發布結果: 將處理結果(可能是新事件、告警、聚合值等)發送到下游系統 (
Event Publishers)。
組件之間的關系:
graph TD;
A[Event Receivers] -->|接收事件| B[Event Streams]
B -->|流轉事件| C[Execution Plans]
C -->|執行計劃| D[Event Publishers]
D -->|發布事件| A
核心概念詳解:
-
事件接收器 (Event Receivers)
- 功能: 它們是事件處理管道的入口點。負責監聽、接收、解析和轉換來自各種外部源的事件數據,并將其注入到 WSO2 平臺內部的 Event Streams 中。
- 作用:
- 連接外部世界: 連接到消息隊列(Kafka, JMS, RabbitMQ)、數據庫(通過輪詢)、HTTP/S 端點(REST, WebSocket)、TCP/UDP 套接字、文件、郵件等。
- 協議適配: 理解不同來源的協議和傳輸格式。
- 數據轉換: 將接收到的原始數據(如 JSON, XML, CSV, Binary)解析并映射到內部事件流 (
Event Stream) 定義的格式(通常是鍵值對或對象)。這個過程通常涉及定義映射規則(如@map注解)。 - 注入事件: 將轉換后的、格式統一的事件對象發布到指定的內部事件流中,供后續處理。
- 關鍵點:
- 一個 Receiver 通常對應一個特定的外部源和一種協議。
- 定義了輸入事件如何映射到目標 Event Stream 的 schema。
- 是平臺與外部事件生產者(如傳感器、應用程序、日志文件)的橋梁。
-
事件流 (Event Streams)
- 功能: 定義了在 WSO2 平臺內部流動的事件數據的結構和元數據。它們是事件在管道中傳輸和處理的載體和契約。
- 作用:
- 數據模型: 明確規定一個事件包含哪些屬性(字段),每個屬性的數據類型是什么(e.g.,
string,int,long,float,double,bool)。 - 唯一標識: 每個流有一個唯一的名稱(通常由
Stream ID和Stream Version組成,如FooStream:1.0.0)。 - 數據承載: 實際的事件數據按照這個定義的結構在系統中流動。
- 組件間連接: Event Receivers 將事件注入特定的 Stream, Execution Plans 從 Streams 讀取事件進行處理,并將結果寫入新的 Streams, Event Publishers 從 Streams 讀取事件進行發布。
- 數據模型: 明確規定一個事件包含哪些屬性(字段),每個屬性的數據類型是什么(e.g.,
- 關鍵點:
- 類似于數據庫表定義或消息隊列中消息的 Schema。
- 確保處理邏輯 (
Execution Plans) 明確知道它正在處理的數據是什么樣子。 - 允許多個組件(Receivers, Execution Plans, Publishers)通過共享的 Stream 進行解耦的通信。
- 流中的事件通常是不可變的。
-
執行計劃 (Execution Plans)
- 功能: 這是事件處理管道的大腦和核心邏輯單元。它定義了如何處理一個或多個輸入事件流 (
Event Streams) 中的數據,執行計算、分析、轉換,并產生結果輸出到一個或多個輸出事件流 (Event Streams)。 - 作用:
- 處理邏輯容器: 包含一個或多個Siddhi 查詢 (SiddhiQL Queries)。SiddhiQL 是一種專門為流處理和 CEP 設計的 SQL-like 語言。
- 復雜計算: 執行各種操作,包括:
- 過濾:
select * from InputStream[price > 100] - 投影:
select symbol, price from InputStream - 窗口操作: 基于時間(
time window)或數量(length window)對事件進行分組聚合(avg,sum,min,max,count)或排序。 - 模式匹配: 檢測事件序列中的特定模式(
every e1=InputStream1 -> e2=InputStream2[e2.value > e1.value])。 - 關聯/連接: 將不同流的事件基于條件關聯起來(
join,union)。 - 機器學習/預測: 使用內置或擴展的函數進行簡單預測或異常檢測。
- 外部調用: 調用外部服務或函數(通過擴展)。
- 過濾:
- 狀態管理: 維護處理過程中需要的狀態(如窗口內的數據、計數器、會話信息)。
- 輸出生成: 處理的結果會以新事件的形式寫入到定義的輸出事件流中。
- 關鍵點:
- 一個 Execution Plan 是處理邏輯的部署單元。它被打包和部署到 WSO2 SP/SI/Siddhi Runner 運行時中。
- 使用強大的 SiddhiQL 語言定義業務邏輯。
- 消費輸入流,生產輸出流。
- 可以非常復雜,實現實時分析、告警生成、實時儀表盤數據準備等。
- 替代術語注意: 在較新版本的 WSO2 SP/Siddhi Runner 中,“Execution Plan” 的概念有時直接被稱為 “Siddhi App”。一個 Siddhi App 文件 (
.siddhi) 包含了 Stream 定義、Receiver/Publisher 配置(通過注解)和 SiddhiQL 查詢,本質上就是一個自包含的 Execution Plan。
- 功能: 這是事件處理管道的大腦和核心邏輯單元。它定義了如何處理一個或多個輸入事件流 (
-
事件發布器 (Event Publishers)
- 功能: 它們是事件處理管道的出口點。負責從內部的 Event Streams 中讀取處理完成的事件結果,將其轉換成目標系統所需的格式,并發布/傳輸到各種外部接收系統(Sinks)。
- 作用:
- 連接下游系統: 連接到消息隊列(Kafka, JMS)、數據庫(JDBC 插入/更新)、HTTP/S 端點(REST API 調用)、TCP/UDP 套接字、文件、郵件、日志等。
- 協議適配: 使用下游系統理解的協議進行通信。
- 數據轉換: 將內部事件流格式的事件對象映射并序列化成外部系統要求的格式(如 JSON, XML, CSV, Binary)。這個過程通常也涉及定義映射規則(如
@map注解)。 - 傳輸事件: 將轉換后的數據可靠地(或盡力而為)發送到目標系統。
- 關鍵點:
- 一個 Publisher 通常對應一個特定的外部目的地和一種協議。
- 定義了如何將輸出 Event Stream 的數據映射到目標格式。
- 是平臺與外部事件消費者(如數據庫、儀表盤、告警系統、其他應用程序)的橋梁。
組件間關系與數據流:
想象一個實時的股票價(jia)格監控(kong)和告(gao)警系統:
- Event Receiver (HTTP): 監聽一個 HTTP 端口,接收來自多個股票數據源的 JSON 格式的價格更新事件。它配置了
@map(type='json'),將 JSON 字段映射到名為StockTickStream的內部事件流的屬性 (symbol,price,timestamp)。 - Event Stream (
StockTickStream): 定義了symbol (string),price (float),timestamp (long)。 - Execution Plan (Siddhi App):
- 定義輸入流:
StockTickStream - 編寫 SiddhiQL 查詢:
- 計算每個股票 1 分鐘滾動平均價:
... select symbol, avg(price) as avgPrice ... group by symbol ... - 檢測價格瞬間暴漲(比如 5 秒內漲幅超過 10%):
... every (e1=StockTickStream) -> e2=StockTickStream[e1.symbol==e2.symbol and (e2.price - e1.price)/e1.price > 0.1] within 5 sec ...
- 計算每個股票 1 分鐘滾動平均價:
- 定義輸出流:
AvgPriceStream(schema:symbol,avgPrice)SpikeAlertStream(schema:symbol,startPrice,endPrice,increasePct)
- 定義輸入流:
- Event Publishers:
- Publisher 1 (Kafka): 訂閱
AvgPriceStream。配置@map(type='json'),將事件轉換為 JSON 并發布到 Kafka 的stock-avg-prices主題,供實時儀表盤消費。 - Publisher 2 (Email/JMS): 訂閱
SpikeAlertStream。配置映射,將事件內容格式化為告警文本,通過 Email 發送給交易員或發送到 JMS 隊列供告警系統處理。
- Publisher 1 (Kafka): 訂閱
總結:
- Event Receivers & Publishers: 處理與外部系統的連接、協議適配和數據格式轉換(輸入/輸出適配器)。它們是邊界組件。
- Event Streams: 定義了在系統內部流動的事件數據的標準結構和契約。它們是數據流動的管道和連接組件的紐帶。
- Execution Plans (Siddhi Apps): 包含了使用 SiddhiQL 編寫的核心業務邏輯和處理規則。它們消費輸入流,進行處理(過濾、聚合、模式匹配等),并將結果寫入輸出流。這是實現實時分析和 CEP 功能的引擎。
理解這些概念的關鍵:
- 管道化: 數據從 Receiver 進入 -> 流入 Stream -> 被 Execution Plan 處理 -> 結果流入另一個 Stream -> 由 Publisher 發送出去。
- 解耦: Streams 使得 Receivers、Execution Plans 和 Publishers 可以獨立定義、修改和擴展,只要它們遵守 Stream 的契約(Schema)。
- 聲明式處理: Execution Plans 使用 SiddhiQL 這種高級的聲明式語言來描述“做什么”(如找出平均值或模式),而不是詳細指定“怎么做”(如遍歷循環),由運行時引擎優化執行。
掌握這四個核心概(gai)念(nian)是有效設計、開發和(he)部署(shu)基于 WSO2 Stream Processor / Siddhi 的實時流處理應用程序的基礎。