Sql-Server

將 csv 文件載入到 SQL Server 2016 中的最有效方法是什麼?

  • February 27, 2017

好的,所以這個問題看起來很簡單。好吧,它不僅僅是載入文件。

整個故事是這樣的:出於某種原因,我們的客戶向我們發送了只能被描述為關係數據庫的內容,被扁平化、壓縮成單個 csv 文件(但分隔符是波浪號而不是逗號)。實際上,這有點可怕;相同的數據在整個文件中無限重複。所以,為了給這些數據帶來一些秩序,我將它載入到一個實際的關係數據庫中。由於數據量很大,將其載入到數據庫中可以更輕鬆地檢查數據是否存在問題。它還使出口變得更加容易。

每條記錄有 53 行,每次傳輸大約有 250,000 條記錄。我想將其拆分為 6 個規範化表。我不確定是否要驗證 C# 程序或我正在使用的 SQL Server 2016 LocalDb 實例中的數據。

我不是經驗豐富的 DBA;我是一名涉足 SQL 的 C# 程序員。我對語法感到很舒服,但我想確保我做對了。

此外,一切都必須完全自動化。文件進來了,C#程序在收到文件時啟動,並將其載入到數據庫中。

讓我再解釋一下佈局。該文件有 53 個欄位,每行包含一個報表的詳細資訊行(他們被收取的費用、為項目收費的頻率、該項目的總成本或貸項等)。問題是每一行都有***整個郵件的資訊,包括付款人、居民、財產和匯款。***知道了這一點,讓我解釋一下我現在是如何做到的:

  1. 打開文件
  2. 對於文件的每一行,檢索描述郵寄、付款人、居民、財產和匯款目的地的表的鍵。
  3. 將該數據與記憶體的數據進行比較。如果記憶體的數據無效,請查詢數據庫以查看是否已添加該實體。如果沒有,請創建它。記憶體那個。
  4. 添加新的詳細資訊行並將其與郵件相關聯,該郵件與詳細資訊具有一對多的關係。(郵寄本身與付款人、財產和匯款有多對一的關係。)
  5. 完成後關閉文件。

這不是世界上最慢的事情,但它完全在 RAM 中完成。由於程序在目前狀態下接近耗盡 RAM 是很危險的,我們決定將數據載入到數據庫中,而不是將其全部保存在本地 RAM 中。希望這能為潛在的回答者提供更多的資訊。謝謝!

我首選的傳遞多個欄位的多行(即不是簡單的分隔列表)的方法是使用表值參數(TVP)。這個想法是,您將在 .NET 程式碼中逐行讀取文件,但是一次將數據全部流式傳輸,或者如果對於一個事務來說太多了,則分成批處理,使用 TVP 到 SQL Server 中。TVP 本質上是一個表變數,它是儲存過程的輸入參數,因此這將是一個基於集合的操作(就 SQL Server 而言),而不是逐行操作。從技術上講,不需要儲存過程,因為 TVP 可以作為參數發送到 ad hoc 查詢,但無論如何使用儲存過程只是一種更好的方法。

在讀取文件時使用 TVP 有兩種主要模式(每種模式都依賴於傳入一個返回IEnumerable<SqlDataRecord>而不是 a的方法DataTable):

  1. 執行“導入”儲存過程,啟動文件打開和讀取。所有行都被讀取(並且可以在此時驗證)並流式傳輸到儲存過程中。在這種方法中,儲存過程只執行一次,所有行都作為一個集合發送。這是更簡單的方法,但對於較大的數據集(即數百萬行),如果操作是將數據直接合併到活動表中而不是簡單地載入到臨時表中,則它可能不會執行最佳操作。這種方法所需的記憶體是 1 條記錄的大小。

  2. 為 創建一個變數int _BatchSize,打開文件,然後:

  3. 創建一個集合來保存這批記錄

    1. 循環_BatchSize或直到沒有更多行可以從文件中讀取

      1. 讀一行
      2. 證實
      3. 在集合中儲存有效條目
    2. 在每個循環結束時執行儲存過程,在集合中流式傳輸。

    3. 這種方法所需的記憶體是 1 條記錄 * 的大小_BatchSize

    4. 好處是數據庫中的事務不依賴於任何磁碟 I/O 延遲或業務邏輯延遲。

  4. 循環執行儲存過程,直到沒有更多行可以從文件中讀取

    1. 執行儲存過程

      1. 循環_BatchSize或直到沒有更多行可以從文件中讀取

        1. 讀一行
        2. 證實
        3. 將記錄流式傳輸到 SQL Server
    2. 這種方法所需的記憶體是 1 條記錄的大小。

    3. 缺點是數據庫中的事務取決於磁碟 I/O 延遲和/或業務邏輯延遲,因此可能會打開更長時間,因此更有可能阻塞。

在以下答案中,我在 StackOverflow 上有一個模式 #1 的完整範例:如何在最短的時間內插入 1000 萬條記錄?

對於模式 #2.1(一種高度可擴展的方法),我在下面有一個部分範例:

所需的數據庫對象(使用人為的結構):

首先,您需要一個使用者定義的表類型 (UDTT)。

請注意,在記錄到達 SQL Server 之前,使用UNIQUEDEFAULTCHECKConstraints 來強制數據完整性。唯一約束也是您在表變數上創建索引的方式:)。

CREATE TYPE [ImportStructure] AS TABLE
(
   BatchRecordID INT IDENTITY(1, 1) NOT NULL,
   Name NVARCHAR(200) NOT NULL,
   SKU VARCHAR(50) NOT NULL UNIQUE,
   LaunchDate DATETIME NULL,
   Quantity INT NOT NULL DEFAULT (0),
   CHECK ([Quantity] >= 0)
);
GO

接下來,使用 UDTT 作為導入儲存過程的輸入參數(因此是“表值參數”)。

CREATE PROCEDURE dbo.ImportData (
  @CustomerID     INT,
  @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

UPDATE prod
SET    prod.[Name] = imp.[Name],
      prod.[LaunchDate] = imp.[LaunchDate],
      prod.[Quantity] = imp.[Quantity]
FROM   [Inventory].[Products] prod
INNER JOIN @ImportTable imp
       ON imp.[SKU] = prod.[SKU]
WHERE  prod.CustomerID = @CustomerID;

INSERT INTO [Inventory].[Products] ([CustomerID], [SKU], [Name], [LaunchDate], [Quantity])
   SELECT  @CustomerID, [SKU], [Name], [LaunchDate], [Quantity]
   FROM    @ImportTable imp
   WHERE   NOT EXISTS (SELECT prod.[SKU]
                       FROM   [Inventory].[Products] prod
                       WHERE  prod.[SKU] = imp.[SKU]
                      );
GO

應用程式碼:

首先,我們將定義用於儲存批次記錄的類:

using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private class ImportBatch
{
 string Name;
 string SKU;
 DateTime LaunchDate;
 int Quantity;
}

接下來,我們定義用於將數據從集合流式傳輸到 SQL Server 的方法。請注意:

  • 即使它是一個欄位,也定義了SqlMetaData條目。但是,它的定義方式表明該值是伺服器生成的。BatchRecordID``IDENTITY
  • 返回記錄,yield return然後控制權返回到下一行(返回到循環的頂部)。
private static IEnumerable<SqlDataRecord> SendImportBatch(List<ImportBatch> RecordsToSend)
{
  SqlMetaData[] _TvpSchema = new SqlMetaData[] {
     new SqlMetaData("BatchRecordID", SqlDbType.Int, true, false,
                     SortOrder.Unspecified, -1),
     new SqlMetaData("Name", SqlDbType.NVarChar, 200),
     new SqlMetaData("SKU", SqlDbType.VarChar, 50),
     new SqlMetaData("LaunchDate", SqlDbType.DateTime),
     new SqlMetaData("Quantity", SqlDbType.Int)
  };

  SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);

  // Stream the collection into SQL Server without first
  // copying it into a DataTable.
  foreach (ImportBatch _RecordToSend in RecordsToSend)
  {
     // we don't set field 0 as that is the IDENTITY field
     _DataRecord.SetString(1, _RecordToSend.Name);
     _DataRecord.SetString(2, _RecordToSend.SKU);
     _DataRecord.SetDateTime(3, _RecordToSend.LaunchDate);
     _DataRecord.SetInt32(4, _RecordToSend.Quantity);

     yield return _DataRecord;
  }
}

最後,我們定義了整體的導入處理操作。它打開與 SQL Server 和文件的連接,然後循環讀取文件並驗證_BatchSize每個循環的記錄數。儲存過程參數只定義一次,因為它們不會改變:CustomerID值不會改變,TVP 參數值只是對方法的引用SendImportBatch——只有在儲存過程通過ExecuteNonQuery. 作為 TVP 值傳入的方法的輸入參數是引用類型,因此它應該始終反映該變數/對象的目前值。

public static void ProcessImport(int CustomerID)
{
  int _BatchSize = GetBatchSize();
  string _ImportFilePath = GetImportFileForCustomer(CustomerID);

  List<ImportBatch> _CurrentBatch = new List<ImportBatch>();
  ImportBatch _CurrentRecord;

  SqlConnection _Connection = new SqlConnection("{connection string}");
  SqlCommand _Command = new SqlCommand("ImportData", _Connection);
  _Command.CommandType = CommandType.StoredProcedure;

  // Parameters do not require leading "@" when using CommandType.StoredProcedure
  SqlParameter _ParamCustomerID = new SqlParameter("CustomerID", SqlDbType.Int);
  _ParamCustomerID.Value = CustomerID;
  _Command.Parameters.Add(_ParamCustomerID);

  SqlParameter _ParamImportTbl = new SqlParameter("ImportTable", SqlDbType.Structured);
  // TypeName is not needed when using CommandType.StoredProcedure
  //_ParamImportTbl.TypeName = "dbo.ImportStructure";
  // Parameter value is method that returns streamed data (IEnumerable)
  _ParamImportTbl.Value = SendImportBatch(_CurrentBatch);
  _Command.Parameters.Add(_ParamImportTbl);

  StreamReader _FileReader = null;

  try
  {
     int _RecordCount;
     string[] _InputLine = new string[4];

     _Connection.Open();

     _FileReader = new StreamReader(_ImportFilePath);

      // process the file
      while (!_FileReader.EndOfStream)
      {
         _RecordCount = 1;

         // process a batch
         while (_RecordCount <= _BatchSize
                 && !_FileReader.EndOfStream)
         {
            _CurrentRecord = new ImportBatch();

            _InputLine = _FileReader.ReadLine().Split(new char[]{','});

            _CurrentRecord.Name = _InputLine[0];
            _CurrentRecord.SKU = _InputLine[1];
            _CurrentRecord.LaunchDate = DateTime.Parse(_InputLine[2]);
            _CurrentRecord.Quantity = Int32.Parse(_InputLine[3]);

            // Do validations, transformations, etc
            if (record is not valid)
            {
               _CurrentRecord = null;
               continue; // skip to next line in the file
            }

            _CurrentBatch.Add(_CurrentRecord);
            _RecordCount++; // only increment for valid records
         }

         _Command.ExecuteNonQuery(); // send batch to SQL Server

         _CurrentBatch.Clear();
      }
  }
  finally
  {
     _FileReader.Close();

     _Connection.Close();
  }

  return;
}

如果READONLYTVP 的性質禁止在合併到目標表之前需要進行一些驗證和/或轉換,那麼 TVP 中的數據可以在儲存過程開始時輕鬆傳輸到本地臨時表。

引用自:https://dba.stackexchange.com/questions/145089