现在有两组 JSON 日志需要上报到 Elasticsearch
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | {"Topic": "download_failed",
 "Source": "192.168.1.158",
 "FailCode": "0500",
 "Title": "多米尼克大战",
 "Space": 0,
 "Time": 1494023823,
 "Speed": "84.82267",
 }
 
 | 
| 12
 3
 4
 5
 6
 7
 8
 
 | {"Topic": "download_success",
 "Source": "192.168.1.158",
 "Title": "爱送书",
 "Space": 1028,
 "Time": 1494090811,
 "Speed": "120.82",
 }
 
 | 
filter配置
配置好 logstash pipeline:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 
 | input {stdin{
 codec => json
 }
 }
 
 filter{
 # @timestamp 取 stdin 传入的 __time__ 字段
 date {
 match => ["Time", "UNIX"]
 }
 
 # 经纬度转换
 geoip {
 source => "Source"
 }
 }
 
 output {
 elasticsearch {
 hosts => ["http://localhost:9200"]
 index => "logstash-android-%{+YYYY.MM.dd}"
 document_type => "%{Topic}"
 }
 }
 
 | 
1.上面看到 投递到 Elasticsearch 的索引是 logstash- 为前缀,是为了让 geoIp 处理后的经纬度符合 geopint 类型,相关参考: https://github.com/elastic/logstash/issues/3137
2.将这两条日志通过 pipeline 投递到 Elasticsearch 时会自动创建 mapping,可以在 kibana > Dev Tools 通过命令查看 mapping
| 1
 | GET logstash-android-*/_mapping
 | 
如果你的 mapping 类型和你预期的不符合,可以调整filter,比如 原始数据 Speed 字段是 string 类型,而你想要 float 类型
| 12
 3
 4
 5
 
 | mutate {convert => {
 "Space" => "integer"
 }
 }
 
 | 
template 配置