ADF 模式可最大限度地減少批處理執行時間
我們使用 Azure 數據工廠 (ADF) 將大量源表從本地 SQL Server DB 拉入 Azure Data Lake (DL)。我們使用 Lookup-ForEach 模式使這個數據驅動。
有一張大桌子,幾張大桌子和幾張小桌子。它們的範圍從 400GB 到 1MB。
執行時並行度由 ADF ForEach 活動的 Batch Count 參數控制。我認為這是可用的工作隊列或“工人”的數量。
標準實現在執行之前以循環方式將項目分配給工作人員。這意味著總是有任務在最大的表後面排隊。這不必要地增加了總經過時間。人工調查表明,如果最大的桌子有自己的工人,那麼所有其他桌子都可以容納其他三個工人,並且經過的時間有點統一。
圖 2:安排觀察到的經過時間,使最大的表(深藍色)有自己的工人產出相當均勻的結束時間。
我可以實施哪些技術或模式來將工作分配給員工,從而最大限度地減少總體執行時間?
我們可以隨心所欲地改變 ADF 和 DL。然而,相應的集成執行時 (IR) 是有限的。我們不能只是擴大規模來解決這種情況。
源系統是第三方的。可以進行小的修改,但無法進行主要的源表重構。
我們將添加更多源表。在添加源時需要最少重新編碼的解決方案將是可取的。該系統正在積極維護中,因此將實施必要的更改。
每個源表的大小每天都會有所不同,但不會很大。如果表今天有 20GB,明天可能是 19GB 或 21GB,但不會是 TB。
所以,是的,這是一個項目數量、相對大小和隊列數量相當穩定的調度問題。
有關的
選擇按值均勻分佈的分組數據這個答案將有助於將權重分配給 4 個工人。但是,每個工人仍然有相同數量的作業要執行。
稍微修改一下:
UPDATE w SET w.grp=(CASE w._row WHEN x._pos_row THEN x._neg_grp ELSE w.grp END), w.moved=(CASE w._row WHEN x._pos_row THEN w.moved+1 ELSE w.moved END) -- only change group for possitive one FROM @work AS w INNER JOIN ( SELECT TOP 1 pos._row AS _pos_row, pos.grp AS _pos_grp, neg._row AS _neg_row, neg.grp AS _neg_grp FROM cte AS pos INNER JOIN cte AS neg ON pos._grpoffset>0 AND neg._grpoffset<0 AND --- To prevent infinite recursion: pos.moved<4 AND neg.moved<4 WHERE --- trade off between move data from positive to negative side ABS(neg.[time]+pos.[time])<=ABS(pos.[time]) --- Largest changes first: ORDER BY ABS(pos.[time]+neg.[time]) DESC ) AS x ON w._row IN (x._pos_row, x._neg_row);
現在,工人沒有同等數量的工作。但是,重量實際上是作為您的最佳包裝圖分佈的。
之後,我們可以使用 forEach 呼叫執行的管道 4 次(並行,使用組作為參數)。對於每個組,您可以按順序執行您的管道。
最後,4條並行流水線將在最佳時間執行。
如果我們將問題理解為 ForEach 將額外的任務推到非常大任務後面的隊列中,則可以通過實現拉機制來解決這個問題。
我們可以通過避開 Lookup-ForEach 模式中內置的隊列來實現這一點。相反,我們只啟動足夠多的工人來提供我們想要的並行度,並擁有這些拉任務。
ForEach 的並行性由其 Batch Count 控制。在撰寫本文時,最大值為 50,無法在執行時動態設置。我們將此值設置為最大值 (50) 並通過應用程式碼控制有效的執行時並行性。讓我們呼叫我們的應用程序值 AppBatchCount。所需的執行時值可以保存在全域參數、管道參數、DBMS 配置表或任何方便的機制中(下圖中的活動“A”)。如果我們想要 ForEach AppBatchCount 中的四個並行工作人員將具有值 4。
使用表達式填充數組變數
@range(1, AppBatchCount)
(下面的活動“B”)。正是這個數組變數映射到 ForEach 的項目(下面的活動“C”)。通過這種方式,AppBatchCount 的值直接影響給定要執行的活動的並行工作人員的數量。ForEach 內部是執行完成任務所需工作的活動。將它們打包在單獨的管道中通常很方便。這個管道將有一個由變數控制的直到活動。我們稱該變數為“LoopAgain”。在直到任務出列。如果獲得任務,則將 LoopAgain 設置為“true”並執行該任務。如果沒有任務出隊,則 LoopAgain 設置為“false”並且直到退出,終止此工作程序。
任務如何出隊將取決於用於保存它的技術。對於 SQL Server,這是一項眾所周知的任務。其他技術將有自己的技術。任務將按大小順序出列,首先是最大的(即執行時間最長的)。
以問題中圖 2 的分佈為例,使用 4 個 worker,首先啟動最大的 4 個任務,每個 worker 一個。第 4 大任務將首先完成(根據定義)並開始執行第 5 大任務。這將一直持續到工人 2-4 用盡隊列並終止。最終工人 1 將完成最大的任務,尋找更多的工作,發現沒有什麼可做的,也終止了。因此,最大任務後面沒有工作排隊,並且經過的時間最小化。
可以通過將新的源表插入到隊列中來添加新的源表,並適當估計它們的大小。事實上,可以記錄實際處理時間,並隨著時間的推移自動調整處理順序。
ADF 管道定義
{ "name": "ForEach pull", "properties": { "activities": [ { "name": "BatchCountArray", "type": "SetVariable", "dependsOn": [ { "activity": "Get AppBatchCount", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "variableName": "BatchCountArray", "value": { "value": "@range(1, int(activity('Get AppBatchCount').output.firstRow.AppBatchCount))", "type": "Expression" } } }, { "name": "Pull tasks", "description": "Set Batch Count to the maximum permitted. The actual number will be controlled by the size of BatchCountArray.", "type": "ForEach", "dependsOn": [ { "activity": "BatchCountArray", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "items": { "value": "@variables('BatchCountArray')", "type": "Expression" }, "isSequential": false, "batchCount": 50, "activities": [ { "name": "ForEach pull worker", "type": "ExecutePipeline", "dependsOn": [], "userProperties": [], "typeProperties": { "pipeline": { "referenceName": "ForEach pull worker", "type": "PipelineReference" }, "waitOnCompletion": true, "parameters": { "WorkerNumber": { "value": "1", "type": "Expression" } } } } ] } }, { "name": "Get AppBatchCount", "type": "Lookup", "dependsOn": [], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [] } ], "variables": { "Worker": { "type": "String", "defaultValue": "4" }, "BatchCountArray": { "type": "Array" } }, "annotations": [] } }