怎么将nginx日志导入elasticsearch

nginx日志通过filebeat收集后传入logstash,经过logstash处理后写入elasticsearch。filebeat只负责收集工作,logstash完成日志的格式化,数据的替换,拆分 ,以及将日志写入elasticsearch后的索引的创建。

1、配置nginx日志格式

log_format main    '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '              '$status $body_bytes_sent $http_referer '              '"$http_user_agent" '             '"$connection" '             '"$http_cookie" '             '$request_time '             '$upstream_response_time';

2、安装配置filebeat,启用nginx module

tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -c /usr/local cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat cd /usr/local/filebeat

启用nginx模块

./filebeat modules enable nginx

查看模块

./filebeat modules list

创建配置文件

vim /usr/local/filebeat/blog_module_logstash.yml filebeat.modules: - module: nginx  access:   enabled: true   var.paths: ["/home/weblog/blog.cnfol.com_access.log"]  #error:  # enabled: true  # var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]   output.logstash:  hosts: ["192.168.15.91:5044"]

启动filebeat

./filebeat -c blog_module_logstash.yml -e

3、配置logstash

tar -zxvf logstash-6.2.4.tar.gz /usr/local cd /usr/local;ln -s logstash-6.2.4 logstash 创建一个nginx日志的pipline文件 cd /usr/local/logstash

logstash内置的模板目录

vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns

编辑 grok-patterns 添加一个支持多ip的正则

forword (?:%{ipv4}[,]?[ ]?)+|%{word}

官方grok

#

创建logstash pipline配置文件

#input { # stdin {} #} # 从filebeat接受数据 input {  beats {  port =&gt; 5044  host =&gt; "0.0.0.0"  } }  filter {  # 添加一个调试的开关  mutate{add_field =&gt; {"[@metadata][debug]"=&gt;true}}  grok {  # 过滤nginx日志  #match =&gt; { "message" =&gt; "%{nginxaccess_test2}" }  #match =&gt; { "message" =&gt; '%{iporhost:clientip} # (?<http_x_forwarded_for>[^#]*) # [%{httpdate:[@metadata][webtime]}] # %{notspace:hostname} # %{word:verb} %{uripathparam:request} http/%{number:httpversion} # %{number:response} # (?:%{number:bytes}|-) # (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{number:connection}|-)"|%{number:connection}|-) # (?:"(?<cookies>[^#]*)") # %{number:request_time:float} # (?:%{number:upstream_response_time:float}|-)' }  #match =&gt; { "message" =&gt; '(?:%{iporhost:clientip}|-) (?:%{two_ip:http_x_forwarded_for}|%{ipv4:http_x_forwarded_for}|-) [%{httpdate:[@metadata][webtime]}] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) (?:"(?<cookies>[^#]*)") %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }     match =&gt; { "message" =&gt; '(?:%{iporhost:clientip}|-) %{forword:http_x_forwarded_for} [%{httpdate:[@metadata][webtime]}] (?:%{hostname:hostname}|-) %{word:method} %{uripathparam:request} http/%{number:httpversion} %{number:response} (?:%{number:bytes}|-) (?:"(?:%{notspace:referrer}|-)"|%{notspace:referrer}|-) %{qs:agent} (?:"(?:%{number:connection}|-)"|%{number:connection}|-) %{qs:cookie} %{number:request_time:float} (?:%{number:upstream_response_time:float}|-)' }  }  # 将默认的@timestamp(beats收集日志的时间)的值赋值给新字段@read_tiimestamp  ruby {   #code =&gt; "event.set('@read_timestamp',event.get('@timestamp'))"  #将时区改为东8区  code =&gt; "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"  }  # 将nginx的日志记录时间格式化  # 格式化时间 20/may/2015:21:05:56 +0000  date {  locale =&gt; "en"  match =&gt; ["[@metadata][webtime]","dd/mmm/yyyy:hh:mm:ss z"]  }  # 将bytes字段由字符串转换为数字  mutate {  convert =&gt; {"bytes" =&gt; "integer"}  }  # 将cookie字段解析成一个json  #mutate {  # gsub =&gt; ["cookies",';',',']  #}   # 如果有使用到cdn加速http_x_forwarded_for会有多个ip,第一个ip是用户真实ip  if[http_x_forwarded_for] =~ ", "{      ruby {          code =&gt; 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'         }     }  # 解析ip,获得ip的地理位置  geoip {  source =&gt; "http_x_forwarded_for"  # # 只获取ip的经纬度、国家、城市、时区  fields =&gt; ["location","country_name","city_name","region_name"]   }  # 将agent字段解析,获得浏览器、系统版本等具体信息  useragent {  source =&gt; "agent"  target =&gt; "useragent"  }  #指定要删除的数据  #mutate{remove_field=&gt;["message"]}  # 根据日志名设置索引名的前缀  ruby {  code =&gt; 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'  }   # 将@timestamp 格式化为2019.04.23  ruby {  code =&gt; 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%y.%m.%d"))'  }  # 设置输出的默认索引名  mutate {  add_field =&gt; {   #"[@metadata][index]" =&gt; "%{@[metadata][index_pre]}_%{+yyyy.mm.dd}"   "[@metadata][index]" =&gt; "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"  }  }  # 将cookies字段解析成json # mutate { # gsub =&gt; [ #  "cookies", ";", ",", #  "cookies", "=", ":" # ] # #split =&gt; {"cookies" =&gt; ","} # } # json_encode { # source =&gt; "cookies" # target =&gt; "cookies_json" # } # mutate { # gsub =&gt; [ #  "cookies_json", ',', '","', #  "cookies_json", ':', '":"' # ] # } # json { # source =&gt; "cookies_json" # target =&gt; "cookies2" # }  # 如果grok解析存在错误,将错误独立写入一个索引  if "_grokparsefailure" in [tags] {  #if "_dateparsefailure" in [tags] {  mutate {   replace =&gt; {   #"[@metadata][index]" =&gt; "%{@[metadata][index_pre]}_failure_%{+yyyy.mm.dd}"   "[@metadata][index]" =&gt; "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"   }  }  # 如果不存在错误就删除message  }else{  mutate{remove_field=&gt;["message"]}  } }  output {  if [@metadata][debug]{  # 输出到rubydebuyg并输出metadata  stdout{codec =&gt; rubydebug{metadata =&gt; true}}  }else{  # 将输出内容转换成 "."  stdout{codec =&gt; dots}   # 将输出到指定的es  elasticsearch {   hosts =&gt; ["192.168.15.160:9200"]   index =&gt; "%{[@metadata][index]}"   document_type =&gt; "doc"  }   } }</cookies></cookies></http_user_agent></http_x_forwarded_for>

启动logstash

nohup bin/logstash -f test_pipline2.conf &amp;

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享