Import
將 CSV 或 JSON 文件導入 DynamoDB
我有 1000 個 CSV 文件。每個 CSV 文件大小在 1 到 500 MB 之間,格式相同(即相同的列順序)。我有一個列標題的標頭檔,它與我的 DynamoDB 表的列名匹配。我需要將這些文件導入到 DynamoDB 表中。這樣做的最佳方式/工具是什麼?
我可以將這些 CSV 文件連接成一個巨大的文件(我寧願避免這樣做),或者在需要時將它們轉換為 JSON。我知道BatchWriteItem的存在,所以我想一個好的解決方案將涉及批量寫入。
例子:
- DynamoDB 表有兩列:first_name、last_name
- 標頭檔只包含:
first_name,last_name
- 一個 CSV 文件看起來像
:
John,Doe Bob,Smith Alice,Lee Foo,Bar
最後,我編寫了一個 Python 函式
import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types)
,將 CSV 導入到 DynamoDB 表中。必須指定列名和列。它使用boto ,並從這個要點中獲得了很多靈感。下面是函式以及使用的展示 (main()
) 和 CSV 文件。在 Windows 7 x64 上使用 Python 2.7.5 進行了測試,但它應該適用於任何具有 boto 和 Python 的作業系統。import boto MY_ACCESS_KEY_ID = 'copy your access key ID here' MY_SECRET_ACCESS_KEY = 'copy your secrete access key here' def do_batch_write(items, table_name, dynamodb_table, dynamodb_conn): ''' From https://gist.github.com/griggheo/2698152#file-gistfile1-py-L31 ''' batch_list = dynamodb_conn.new_batch_write_list() batch_list.add_batch(dynamodb_table, puts=items) while True: response = dynamodb_conn.batch_write_item(batch_list) unprocessed = response.get('UnprocessedItems', None) if not unprocessed: break batch_list = dynamodb_conn.new_batch_write_list() unprocessed_list = unprocessed[table_name] items = [] for u in unprocessed_list: item_attr = u['PutRequest']['Item'] item = dynamodb_table.new_item( attrs=item_attr ) items.append(item) batch_list.add_batch(dynamodb_table, puts=items) def import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types): ''' Import a CSV file to a DynamoDB table ''' dynamodb_conn = boto.connect_dynamodb(aws_access_key_id=MY_ACCESS_KEY_ID, aws_secret_access_key=MY_SECRET_ACCESS_KEY) dynamodb_table = dynamodb_conn.get_table(table_name) BATCH_COUNT = 2 # 25 is the maximum batch size for Amazon DynamoDB items = [] count = 0 csv_file = open(csv_file_name, 'r') for cur_line in csv_file: count += 1 cur_line = cur_line.strip().split(',') row = {} for colunm_number, colunm_name in enumerate(colunm_names): row[colunm_name] = column_types[colunm_number](cur_line[colunm_number]) item = dynamodb_table.new_item( attrs=row ) items.append(item) if count % BATCH_COUNT == 0: print 'batch write start ... ', do_batch_write(items, table_name, dynamodb_table, dynamodb_conn) items = [] print 'batch done! (row number: ' + str(count) + ')' # flush remaining items, if any if len(items) > 0: do_batch_write(items, table_name, dynamodb_table, dynamodb_conn) csv_file.close() def main(): ''' Demonstration of the use of import_csv_to_dynamodb() We assume the existence of a table named `test_persons`, with - Last_name as primary hash key (type: string) - First_name as primary range key (type: string) ''' colunm_names = 'Last_name First_name'.split() table_name = 'test_persons' csv_file_name = 'test.csv' column_types = [str, str] import_csv_to_dynamodb(table_name, csv_file_name, colunm_names, column_types) if __name__ == "__main__": main() #cProfile.run('main()') # if you want to do some profiling
test.csv
的內容(必須與 Python 腳本位於同一文件夾中):John,Doe Bob,Smith Alice,Lee Foo,Bar a,b c,d e,f g,h i,j j,l