Import

將 CSV 或 JSON 文件導入 DynamoDB

  • February 6, 2019

我有 1000 個 CSV 文件。每個 CSV 文件大小在 1 到 500 MB 之間,格式相同(即相同的列順序)。我有一個列標題的標頭檔,它與我的 DynamoDB 表的列名匹配。我需要將這些文件導入到 DynamoDB 表中。這樣做的最佳方式/工具是什麼?

我可以將這些 CSV 文件連接成一個巨大的文件(我寧願避免這樣做),或者在需要時將它們轉換為 JSON。我知道BatchWriteItem的存在,所以我想一個好的解決方案將涉及批量寫入。


例子:

  • DynamoDB 表有兩列:first_name、last_name
  • 標頭檔只包含: first_name,last_name
  • 一個 CSV 文件看起來像

:

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar

最後,我編寫了一個 Python 函式import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types),將 CSV 導入到 DynamoDB 表中。必須指定列名和列。它使用boto ,並從這個要點中獲得了很多靈感。下面是函式以及使用的展示 ( main()) 和 CSV 文件。在 Windows 7 x64 上使用 Python 2.7.5 進行了測試,但它應該適用於任何具有 boto 和 Python 的作業系統。

import boto

MY_ACCESS_KEY_ID = 'copy your access key ID here'
MY_SECRET_ACCESS_KEY = 'copy your secrete access key here'


def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn):
   '''
   From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31
   '''
   batch_list = dynamodb_conn.new_batch_write_list()
   batch_list.add_batch(dynamodb_table, puts=items)
   while True:
       response = dynamodb_conn.batch_write_item(batch_list)
       unprocessed = response.get('UnprocessedItems', None)
       if not unprocessed:
           break
       batch_list = dynamodb_conn.new_batch_write_list()
       unprocessed_list = unprocessed[table_name]
       items = []
       for u in unprocessed_list:
           item_attr = u['PutRequest']['Item']
           item = dynamodb_table.new_item(
                   attrs=item_attr
           )
           items.append(item)
       batch_list.add_batch(dynamodb_table, puts=items)


def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types):
   '''
   Import a CSV file to a DynamoDB table
   '''        
   dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY)
   dynamodb_table = dynamodb_conn.get_table(table_name)     
   BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB

   items = []

   count = 0
   csv_file = open(csv_file_name, 'r')
   for cur_line in csv_file:
       count += 1
       cur_line = cur_line.strip().split(',')

       row = {}
       for colunm_number, colunm_name in enumerate(colunm_names):
           row[colunm_name] = column_types[colunm_number](cur_line[colunm_number])

       item = dynamodb_table.new_item(
                   attrs=row
           )           
       items.append(item)

       if count % BATCH_COUNT == 0:
           print 'batch write start ... ', 
           do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)
           items = []
           print 'batch done! (row number: ' + str(count) + ')'

   # flush remaining items, if any
   if len(items) > 0: 
       do_batch_write(items, table_name, dynamodb_table, dynamodb_conn)


   csv_file.close() 


def main():
   '''
   Demonstration of the use of import_csv_to_dynamodb()
   We assume the existence of a table named `test_persons`, with
   - Last_name as primary hash key (type: string)
   - First_name as primary range key (type: string)
   '''
   colunm_names = 'Last_name First_name'.split()
   table_name = 'test_persons'
   csv_file_name = 'test.csv'
   column_types = [str, str]
   import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)


if __name__ == "__main__":
   main()
   #cProfile.run('main()') # if you want to do some profiling

test.csv的內容(必須與 Python 腳本位於同一文件夾中):

John,Doe
Bob,Smith
Alice,Lee
Foo,Bar
a,b
c,d
e,f
g,h
i,j
j,l

引用自:https://dba.stackexchange.com/questions/91971