Etl

ADF 模式可最大限度地減少批處理執行時間

  • September 5, 2021

我們使用 Azure 數據工廠 (ADF) 將大量源表從本地 SQL Server DB 拉入 Azure Data Lake (DL)。我們使用 Lookup-ForEach 模式使這個數據驅動。

有一張大桌子,幾張大桌子和幾張小桌子。它們的範圍從 400GB 到 1MB。

源表的大小(以 GB 為單位) 圖 1:桌子的尺寸。分佈非常傾斜。

執行時並行度由 ADF ForEach 活動的 Ba​​tch Count 參數控制。我認為這是可用的工作隊列或“工人”的數量。

標準實現在執行之前以循環方式將項目分配給工作人員。這意味著總是有任務在最大的表後面排隊。這不必要地增加了總經過時間。人工調查表明,如果最大的桌子有自己的工人,那麼所有其他桌子都可以容納其他三個工人,並且經過的時間有點統一。

在此處輸入圖像描述 圖 2:安排觀察到的經過時間,使最大的表(深藍色)有自己的工人產出相當均勻的結束時間。

我可以實施哪些技術或模式來將工作分配給員工,從而最大限度地減少總體執行時間?

我們可以隨心所欲地改變 ADF 和 DL。然而,相應的集成執行時 (IR) 是有限的。我們不能只是擴大規模來解決這種情況。

源系統是第三方的。可以進行小的修改,但無法進行主要的源表重構。

我們將添加更多源表。在添加源時需要最少重新編碼的解決方案將是可取的。該系統正在積極維護中,因此將實施必要的更改。

每個源表的大小每天都會有所不同,但不會很大。如果表今天有 20GB,明天可能是 19GB 或 21GB,但不會是 TB。

所以,是的,這是一個項目數量、相對大小和隊列數量相當穩定的調度問題。

有關的

選擇按值均勻分佈的分組數據

選擇按值均勻分佈的分組數據這個答案將有助於將權重分配給 4 個工人。但是,每個工人仍然有相同數量的作業要執行。

稍微修改一下:

UPDATE w
   SET w.grp=(CASE w._row
              WHEN x._pos_row THEN x._neg_grp
              ELSE w.grp END), 
       w.moved=(CASE w._row
              WHEN x._pos_row THEN w.moved+1
              ELSE w.moved END) -- only change group for possitive one
   FROM @work AS w
   INNER JOIN (
       SELECT TOP 1 pos._row AS _pos_row, pos.grp AS _pos_grp,
                    neg._row AS _neg_row, neg.grp AS _neg_grp
       FROM cte AS pos
       INNER JOIN cte AS neg ON
           pos._grpoffset>0 AND
           neg._grpoffset<0 AND
           --- To prevent infinite recursion:
           pos.moved<4 AND
           neg.moved<4
       WHERE  --- trade off between move data from positive to negative side
           ABS(neg.[time]+pos.[time])<=ABS(pos.[time])
       --- Largest changes first:
       ORDER BY ABS(pos.[time]+neg.[time]) DESC
       ) AS x ON w._row IN (x._pos_row, x._neg_row);

現在,工人沒有同等數量的工作。但是,重量實際上是作為您的最佳包裝圖分佈的。

之後,我們可以使用 forEach 呼叫執行的管道 4 次(並行,使用組作為參數)。對於每個組,您可以按順序執行您的管道。 在此處輸入圖像描述

在此處輸入圖像描述

最後,4條並行流水線將在最佳時間執行。

如果我們將問題理解為 ForEach 將額外的任務推到非常大任務後面的隊列中,則可以通過實現拉機制來解決這個問題。

我們可以通過避開 Lookup-ForEach 模式中內置的隊列來實現這一點。相反,我們只啟動足夠多的工人來提供我們想要的並行度,並擁有這些拉任務。

ForEach 的並行性由其 Batch Count 控制。在撰寫本文時,最大值為 50,無法在執行時動態設置。我們將此值設置為最大值 (50) 並通過應用程式碼控制有效的執行時並行性。讓我們呼叫我們的應用程序值 AppBatchCount。所需的執行時值可以保存在全域參數、管道參數、DBMS 配置表或任何方便的機制中(下圖中的活動“A”)。如果我們想要 ForEach AppBatchCount 中的四個並行工作人員將具有值 4。

使用表達式填充數組變數@range(1, AppBatchCount)(下面的活動“B”)。正是這個數組變數映射到 ForEach 的項目(下面的活動“C”)。通過這種方式,AppBatchCount 的值直接影響給定要執行的活動的並行工作人員的數量。

ForEach 內部是執行完成任務所需工作的活動。將它們打包在單獨的管道中通常很方便。這個管道將有一個由變數控制的直到活動。我們稱該變數為“LoopAgain”。在直到任務出列。如果獲得任務,則將 LoopAgain 設置為“true”並執行該任務。如果沒有任務出隊,則 LoopAgain 設置為“false”並且直到退出,終止此工作程序。

任務如何出隊將取決於用於保存它的技術。對於 SQL Server,這是一項眾所周知的任務。其他技術將有自己的技術。任務將按大小順序出列,首先是最大的(即執行時間最長的)。

以問題中圖 2 的分佈為例,使用 4 個 worker,首先啟動最大的 4 個任務,每個 worker 一個。第 4 大任務將首先完成(根據定義)並開始執行第 5 大任務。這將一直持續到工人 2-4 用盡隊列並終止。最終工人 1 將完成最大的任務,尋找更多的工作,發現沒有什麼可做的,也終止了。因此,最大任務後面沒有工作排隊,並且經過的時間最小化。

可以通過將新的源表插入到隊列中來添加新的源表,並適當估計它們的大小。事實上,可以記錄實際處理時間,並隨著時間的推移自動調整處理順序。

在此處輸入圖像描述

ADF 管道定義

{
   "name": "ForEach pull",
   "properties": {
       "activities": [
           {
               "name": "BatchCountArray",
               "type": "SetVariable",
               "dependsOn": [
                   {
                       "activity": "Get AppBatchCount",
                       "dependencyConditions": [
                           "Succeeded"
                       ]
                   }
               ],
               "userProperties": [],
               "typeProperties": {
                   "variableName": "BatchCountArray",
                   "value": {
                       "value": "@range(1, int(activity('Get AppBatchCount').output.firstRow.AppBatchCount))",
                       "type": "Expression"
                   }
               }
           },
           {
               "name": "Pull tasks",
               "description": "Set Batch Count to the maximum permitted. The actual number will be controlled by the size of BatchCountArray.",
               "type": "ForEach",
               "dependsOn": [
                   {
                       "activity": "BatchCountArray",
                       "dependencyConditions": [
                           "Succeeded"
                       ]
                   }
               ],
               "userProperties": [],
               "typeProperties": {
                   "items": {
                       "value": "@variables('BatchCountArray')",
                       "type": "Expression"
                   },
                   "isSequential": false,
                   "batchCount": 50,
                   "activities": [
                       {
                           "name": "ForEach pull worker",
                           "type": "ExecutePipeline",
                           "dependsOn": [],
                           "userProperties": [],
                           "typeProperties": {
                               "pipeline": {
                                   "referenceName": "ForEach pull worker",
                                   "type": "PipelineReference"
                               },
                               "waitOnCompletion": true,
                               "parameters": {
                                   "WorkerNumber": {
                                       "value": "1",
                                       "type": "Expression"
                                   }
                               }
                           }
                       }
                   ]
               }
           },
           {
               "name": "Get AppBatchCount",
               "type": "Lookup",
               "dependsOn": [],
               "policy": {
                   "timeout": "7.00:00:00",
                   "retry": 0,
                   "retryIntervalInSeconds": 30,
                   "secureOutput": false,
                   "secureInput": false
               },
               "userProperties": []
           }
       ],
       "variables": {
           "Worker": {
               "type": "String",
               "defaultValue": "4"
           },
           "BatchCountArray": {
               "type": "Array"
           }
       },
       "annotations": []
   }
}

引用自:https://dba.stackexchange.com/questions/296168