Elasticsearch

JDBC Sqlserver 到彈性搜尋:一個輸入到不同的輸出?

  • September 1, 2017

我有一個 sqlserver2016 數據庫,我想使用 logstash 在 elasticsearch 中建立索引。

這是我的 logstash 配置文件,它有點工作:

input {
jdbc {
jdbc_driver_library => "C:\elastic\Microsoft-JDBC-Driver-6.0-for-SQL-Server\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://10.11.12.13:1433;databaseName=testdb1;integratedSecurity=false;user=ElasticExtractor;password=flyingweisels;"
jdbc_user => "ElasticExtractor"
jdbc_password => "flyingweisels"
statement => "select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language_id from items order by Item_Id desc"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "testdata"
document_type => "testtype"
document_id => "%{itemid}"
}
}

所以這個文件應該做什麼,配置是在 elasticSearch 中插入 150k 個項目。在某種程度上,它只導入了其中的三分之一,例如在這種情況下為 62 382。如果我嘗試插入 50k,它只會插入大約 20k。有明顯的理由為什麼會這樣做?

這是目前的執行日誌:

[2017-09-01T08:16:31,923][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-09-01T08:16:31,927][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-09-01T08:16:32,006][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2017-09-01T08:16:32,007][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-09-01T08:16:32,042][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-09-01T08:16:32,050][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2017-09-01T08:16:32,053][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-01T08:16:32,219][INFO ][logstash.pipeline        ] Pipeline main started
[2017-09-01T08:16:32,313][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-01T08:16:32,643][INFO ][logstash.inputs.jdbc     ] (0.050000s) select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language from items order by Item_Id desc
[2017-09-01T08:16:49,805][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

第二件事是,假設我想從來自該輸入的 SQL 伺服器插入一行,我可以使用什麼外掛,以便如果該行具有特定的“merchant_id”,它將進入以該 ID 命名的彈性 TYPE。此外,如果它具有特定的“語言”,它會以該語言作為名稱進入彈性索引。可以這樣做嗎?我是否應該簡單地創建多個 Logstash 配置文件,每個任務一個?

所以我弄清楚我做錯了什麼,有兩個不同的問題。

1.錯誤的插入行數

這是由我使用的 SQL 查詢引起的。Elastic 需要唯一的document_id值,而left join我使用的導致多行返回相同的值。在這種情況下,彈性的作用是用新值覆蓋現有行。

  1. 將行分配給不同的索引和類型 ================

document_id我曾嘗試使用與最終看起來像這樣的相同語法:

index => "%{Language}"
document_type => "%{MerchantID}"

這將導致行被插入一個名為的索引%{Language}中,這顯然不是所需的結果。

問題很簡單:彈性不喜歡大寫字母。

所以我的最終配置文件最終看起來像這樣:

input {
jdbc {
jdbc_driver_library => "C:\elastic\Microsoft-JDBC-Driver-6.0-for-SQL-Server\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://10.11.12.13:1433;databaseName=testdb1;integratedSecurity=false;user=ElasticExtractor;password=flyingweisels;"
jdbc_user => "ElasticExtractor"
jdbc_password => "flyingweisels"
statement => "select top 50000 id itemid,item_id itemid,Merchant_ID merchantid,model,case when (ID%2=0) then 'germanindex' else 'englishindex' end language from items order by id desc"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "%{language}"
document_type => "%{merchantid}"
document_id => "%{itemid}"
}
}

效果很好!

引用自:https://dba.stackexchange.com/questions/184897