--- title: aiot-streams-section2 tags: DAS GA: UA-155999456-1 --- {%hackmd @docsharedstyle/default %} # 14.2.2 AIoT on Streams-flow 本章節將介紹如何利用Streams-flow完成介接感測器資料來源、將模型導入進行預測,並將預測結果傳出。 ## Streams-flow 1. 在專案下點擊 **新增至專案**,建立 **串流流程**。 ![Create Streams-flow-1.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-1.png) 2. 畫面將跳轉至 **新建串流流程**,輸入自訂 **名稱**,選擇 **Streams實例**,並以 **手動** 的方式來 **建立**。 :::danger 若 **Streams實例** 內無實例可供選擇,請聯繫系統管理員,否則無法接續以下步驟。 ::: ![Create Streams-flow-2.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-2.png) ### 選擇來源 3. 在本範例中,資料透過kafka傳輸,故在 **來源** 選擇 **Kafka** ,將滑鼠左鍵壓著拖曳至畫布中。 ![Create Streams-flow-3.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-3.png) 4. 點擊畫布中的 **Kafka**,會跳出右邊設定區,點擊 **新增連線** 以進行連線設定。 ![Create Streams-flow-4.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-4.png) 5. 設定來源 **kafka**。 :::info **Streams-flow** 中的 **Kafka** 需設有加密方式,相關資訊可參閱 [How to Set Up Authentication in Kafka Cluster](https://codeforgeek.com/how-to-set-up-authentication-in-kafka-cluster/) ::: * 點擊 **Apache Kafka Streams**,開始進行設定。首先,輸入 **name**,提供使用者辨認用。 * 接著往下滑依序輸入登入帳密、加密方式及連線位置等資訊。確認無誤後,點擊 **create**。 ![Create Streams-flow-5.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-5.png) ![Create Streams-flow-6.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-6.png) ![Create Streams-flow-7.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-7.png) * 接著選擇我們要訂閱伺服器上的哪個 **Topic**,最後則是定義要將甚麼資料往後面的區塊送,點開 **綱目** 後點擊 **編輯**。 ![Create Streams-flow-8.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-8.png) * 在這個對話框內可以設定要往後送的資料內容及型態。在設定之前,我們先確認收到的資料內容及格式,點擊 **預覽** 下的 **原始串流資料**。圖上顯示的為json格式。 ![Create Streams-flow-10.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-10.png) * 確認完接收到的資料。點擊 **編輯** 依序 **新增屬性**,並填滿屬性名稱的欄位。 :::info 本範例屬性名稱欄位說明: **sid** 為感測器的id,型態為 **Text**,對應在json內"sid",故路徑填入'/sid'。 **date_time** 為資料的輯錄時間,型態為 **Text**,對應在json內"date_time",故路徑填入"/date_time"。 **accX**, **accY**, **accZ** 分別為三軸的加速度資料,在json內為list的形式,但在**Streams-flow**中需把list型態填為**Text**。 ::: ![Create Streams-flow-9.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-9.png) ### 過濾資料 6. 在 **處理和分析** 中選擇 **過濾器**,將滑鼠左鍵壓著拖曳至畫布中,並將來源 **Kafka** 與 **過濾器** 連上,而後點擊 **過濾器** 填上過濾條件。 sid == '700001' 本處設定的用意為只讓 **sid** 為特定編號的資料往後送。 ![Create Streams-flow-11.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-11.png) ### 解析資料 7. 接著我們要來解開 **accX**, **accY**, **accZ** 這三個list。在此處選用 **處理和分析** 中的 **程式碼**,一樣將其拖到畫布中,並與上一個 **過濾器** 連上,點擊 **程式碼** 對其進行編輯。 :::info 圖例中的程式碼大略解說。 上個區塊的資料進來後,會經過__call__這個fuction進行處理,進來的資料為 **event** 這個變數,最後經由 **return** 傳給下個區塊。 ::: ![Create Streams-flow-13.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-13.png) 改寫其中的__call__ function以解開list,並對其加上對應的label。如下圖程式碼所示。 ![Create Streams-flow-14.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-14.png) def __call__(self, event): # event['word_count'] = len(event['phrase'].split()) accX = eval(event['accX']) accY = eval(event['accY']) accZ = eval(event['accZ']) for i in range(len(accX)): event['X' + str(i).zfill(2)] = accX[i] event['Y' + str(i).zfill(2)] = accY[i] event['Z' + str(i).zfill(2)] = accZ[i] return event 如此便將 **accX** 這個list中的值依序轉為 **X01**, **X02**, **X03**, **X04**, **X05** ......,**accY** 與 **accZ** 亦然。 8. 為了將值傳遞給下一個區塊,須定義好資料名稱及型態。點開 **Outout Schema** 後點擊 **編輯**。 ![Create Streams-flow-15.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-15.png) 因 **accX** 已經拆成 **X01**, **X02**, **X03**, **X04**, **X05** ......,故須將屬性名稱及類別依序填上。 因輸出較多(15(點)\*3(軸)),所以此處較為費工。 ![Create Streams-flow-16.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-16.png) ### 模型推論 9. 接著就要將資料送進上一章節的模型做推論了!點擊 **WML 部署** 內前一章節部署的模型,拖拉至畫布中。 ![Create Streams-flow-17.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-17.png) :::danger 這裡有個非常重要細節要注意,Streams-flow預設的檔名會有不符合的符號 \[ ' ],所以須修改這個區塊的預設名稱! ::: ![Create Streams-flow-18.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-18.png) 點開 **部署輸入參數**,確認從上一個區塊傳來的參數是否有正確填入模型的輸入區。 ![Create Streams-flow-19.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-19.png) 同樣的最後需要設定資料傳出去的格式。點開 **Outout Schema** 後點擊 **編輯**。 ![Create Streams-flow-20.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-20.png) 彈出**編輯輸出綱目**對話框後,這邊的設定比較特別,需較為注意。 對於從上個區塊來的資料要在 **模型欄位** 那欄填入 **From Input**,而對於模型的預測結果也分兩種: 其一、若只要預測的類別(還記得本範例的模型是多分類的模型嗎?),則在 **模型欄位** 那欄填入 **prediction** ,類型為 **Number**; 其二、若要各類別的預測機率的話,則在 **模型欄位** 那欄填入 **probability** ,因是一個list,類型為 **Text**。 確認無誤後,點擊 **套用** 退出即可, ![Create Streams-flow-21.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-21.png) ### 處理分析結果 10. 本範例希望針對多筆模型的預測結果取平均後再作為結果輸出,故將用 **視窗** 及 **聚集** 來達到此目的。 :::info **視窗** 的作用為累積一段時間或一定個數的數組後才往後面的區塊傳送資料。 **聚集** 則是針對 **視窗** 傳送來的數組做運用。 ::: 將 **處理和分析** 中的 **視窗** 與 **聚集** 拖拉至畫布中,並將其連線。 ![Create Streams-flow-22.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-22.png) 點擊畫布中的 **視窗**,將 **類型** 設成**tumbling**,**視窗定義方式** 設為 **值組數目**,**值組數目** 設為 **10**。上述設定的意思代表這個 **視窗** 累積10筆數據後就會全部往下一個區塊送,並清空暫存區。 ![Create Streams-flow-23.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-23.png) 點擊畫布中的 **聚集**,本範例選用 **Custom Code**,程式碼形式與上面步驟的 **程式碼** 雷同,差異之處在於資料 **tuples_in_window** 傳進來的時候已經是一個list。 在本範例中透過 **import numpy** 完成計算平均值計算最大值的部分。最後透過 **return** 將聚集結果傳出。 import numpy as np -- def __call__(self, tuples_in_window): # values = [item["id"] for item in tuples_in_window] # num_of_tuples = len(values) # return {"count": num_of_tuples} probability = [eval(item["Prediction_Result_Probability"]) for item in tuples_in_window] # 2D-array probability = np.array(probability) probability_mean = np.mean(probability, axis=0) # select max probability_mean index as prediction result max_label = np.argmax(probability_mean) output = {} output["Max_Label"] = max_label output["Probability_Mean"] = probability_mean.tolist() output["sid"] = tuples_in_window[0]["sid"] output["date_time"] = tuples_in_window[-1]["date_time"] return output ![Create Streams-flow-24.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-24.png) ![Create Streams-flow-25.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-25.png) 照慣例將 **綱目** 點開在 **Outout Schema** 點擊 **編輯**。 ![Create Streams-flow-26.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-26.png) 依照 **Custom Code** 的 **output** 變數內的屬性依序填入。 ![Create Streams-flow-27.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-27.png) ### 發送結果 11. 最後,本範例選用 **Kafka** 作為輸出。將 **目標** 中的 **Kafka** 拖拉至畫布中,並將其與 **聚集** 連線。與步驟4.到步驟5.相同,填入 **Kafka** 的連線資訊,並選擇要傳出的 **主題(Topic)**。 ![Create Streams-flow-28.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-28.png) ### 儲存與執行 12. 全部完成後,點擊 **儲存** 與 **執行**。 ![Create Streams-flow-29.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-29.png) 頁面會自動跳轉如下圖,並耐心等待數分鐘。 ![Create Streams-flow-30.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-30.png) 待其建置完成,將可看到資料流的畫面,點擊其管線的話亦可看到資料內容。 ![Create Streams-flow-31.png](https://cos.twcc.ai/cp4d/das3_5/cases/aiot-streams/image/Create-Streams-flow-31.png) ### End 下個章節將介紹如何利用 **Dashboard** 將結果呈現。