最新 Airflow 2.6 版本有哪些更新?簡介 Notifiers, Grid View, Trigger, CLI 等多項更新內容(附程式碼)

Airflow 在最近釋出了最新的 2.6 版本,Astronomer 也在第一時間舉辦了一個講座,這篇文章記錄我在講座中看到的各種 Airflow 新功能和新修正,講座影片我放在本文最後面的參考資料,若有興趣的話也可以看看!

這次 Airflow 2.6 版本的更新包含了:

  • 35+ 個新功能
  • 50+ 個功能改進
  • 27 個 Bug 修正

以下是本篇文章的大綱:

  1. Notifiers:更方便的 Pipeline 示警功能
    1. 內建 Slack notifier
  2. Continuous Timetable:可以讓 Sensor 持續運作的功能
    1. Continuous 使用範例
  3. 可用 CLI 來測試 Connections 和查看 DAG 詳細資訊
  4. 新增部分 Async 非同步設定,釋放你的 Airflow 壓力
  5. 修正 Task 卡在 Queue 狀態的問題
  6. UI 介面的更新
    1. Grid View 更新:與 Graph View 有了更佳的整合
    2. 更新 Trigger DAG with Parameters 功能:可直接帶入歷史參數設定
  7. Pandas Serializer for XCOM:讓 Task 可以直接回傳 Pandas DataFrame
  8. 其他更新
  9. 總結
  10. 參考資料

Notifiers:更方便的 Pipeline 示警功能

這次多了內建的 Notifier 可以用
主要是搭配 DAG/Task 的 on_success_callback 或是 on_failure_callback 來使用

我覺得把通知直接整合進 Airflow 是一個很重要的功能
因為如果 Pipeline 壞掉卻沒有收到通知,那個後果真的非常可怕
尤其是常常 Pipeline 之間會有相依性
一個壞掉若沒有提早修復,就會造成很恐怖的連鎖反應
(例如:Data Engineer 修到瘋掉)

內建 Slack notifier

目前只有 Slack 是可以直接內建使用的,後續版本應該是還會再新增
設定的程式碼可以參考以下:

https://github.com/astronomer/2-6-example-dags/blob/main/dags/notifier_slack.py

如果想要用其他的通知,就需要另外自定義
自定義的方式也可以參考他們寫的程式碼: 2-6-example-dags/dags/notifier_file_toy.py


Continuous Timetable:可以讓 Sensor 持續運作的功能

在 DAG 的參數 schedule 中新增了 @continuous 的選項

  • 使用情境:如果想要 Sensor 一直維持等待狀態的話
  • 並非 streaming 功能:若想要 streaming 還是建議使用 Kakfa(Airflow 也可以搭配 Kafka 使用)
  • 一定要加上 max_active_runs=1,否則會報錯

Continuous 使用範例

  • S3KeySensorAsync 負責等待 S3 的檔案出現後,開始執行後續 task
  • 設定 DAG schedule 為 @continuous 就可以讓這個 task 一直持續等待檔案的出現並觸發下游
https://github.com/astronomer/2-6-example-dags/blob/main/dags/continuous_S3.py

可用 CLI 來測試 Connections 和查看 DAG 詳細資訊

在過去的版本中,如果想要測試 Connections 的話就要用 UI 介面 Admin > Connections > Test

而這次 2.6 版本可以直接用 CLI 測試 Connections 是否正常運作
而不用實際執行導致 Task 失敗後才知道

這功能可以用來測試 DB 連線或是 Slack 連線是否正常


新增部分 Async 非同步設定,釋放你的 Airflow 壓力

這次也讓部分 Operators 新增了非同步的設定 (Deferrable Operators)
其中包含可以觸發其他 DAG 啟動的 TriggerDagRunOperator

  • 可應用於會跑很長時間的 Task
  • 非同步的好處: 可以釋放 worker slot,讓你的 Airflow 可以有更多空間去跑其他 Task

修正 Task 卡在 Queue 狀態的問題

過去 Airflow 有幾次小 release 在解決這個問題
直到現在的 2.6 版本釋出
算是完全解決了卡在 Queue 的問題

過去我的 Data 團隊也曾遇過這狀況
當時因為 Airflow 故障,所以同時手動重啟了很多個 DAG 去補資料
結果全部都一直卡在灰色的 Queue 狀態,等了好久絲毫沒有動靜
希望這次版本更新之後不會再讓大家陷入這個可怕的排隊地獄


UI 介面的更新

接著是一些關於 UI 介面的更新

Grid View 更新:與 Graph View 有了更佳的整合

新增了可以直接在右側看到 Graph 的功能
不需要像以前一樣在 Grid 和 Graph 分頁之間切換

還可以直接點擊左側欄的 Task
右側會直接將該 Task 顯示藍色底,方便我們快速尋找
右下角也有小地圖,顯示目前畫面位置

如果你的 Airflow 一個 DAG 有上千個 Tasks 時,這功能就會非常實用
(如下圖就是按了 tg_1 這個 Task)

另外右上角還有 Filter 的功能
可以篩選 upstream / downstream 或是上下游皆顯示

如下圖是只篩選了某個 Task Group 的上下游來顯示的 Graph

更新 Trigger DAG with Parameters 功能:可直接帶入歷史參數設定

在原本 Trigger DAG 的播放按鈕
點下去後有一個可以輸入 config 的觸發選項:Trigger DAG w/ config


點擊之後會進入到填寫參數的畫面

之後如果想要再次執行的時候
不需要重新填寫參數
Select Recent Configurations 的下拉式選單中
就可以找到過去參數的歷史紀錄
點選之後就會幫我們自動填入

如果在執行過程有什麼需要自訂的參數(例如:特定日期時間)
就可以在 UI 使用這個方便的新功能

而 DAG config 的設定可以參考這段程式碼

https://github.com/astronomer/2-6-example-dags/blob/main/dags/trigger_with_params.py

若想要讀取自己設定的 Params
可以用 Jinja 的方式取得(例如:{{ params.dog_name }}
(可參考:Params – Airflow Docs)


Pandas Serializer for XCOM:讓 Task 可以直接回傳 Pandas DataFrame

過往的版本如果在 Task 中 return Pandas DataFrame
在沒有另外序列化的狀況下就會報錯

但在 2.6 版本有內建的序列化功能
所以可以直接在 Task 中 return Pandas DataFrame

然後在 UI 的 XCOM 頁籤就可以看到如下圖
如果是常常使用 Pandas DataFrame 的話這功能會非常實用


其他更新

這次的更新也包含一些小的新功能,以及像是效能方面的改善


總結

以上就是 Airflow 最新的 2.6 版本的更新資訊啦!

個人覺得最新的 Slack Notifier、Trigger DAG w/ config、CLI connections test 都是非常實用的功能,新的 UI 介面 Grid View 也很方便檢視所有 Task,所以我覺得還滿值得把 Airflow 版本更新上去!


參考資料

發表迴響

在下方填入你的資料或按右方圖示以社群網站登入:

WordPress.com 標誌

您的留言將使用 WordPress.com 帳號。 登出 /  變更 )

Facebook照片

您的留言將使用 Facebook 帳號。 登出 /  變更 )

連結到 %s