山石Hillstone Logstash 配置流程

Logstash 介绍

Logstash是一个具有实时管道功能的开源数据收集引擎,Logstash可以动态地将来自不同数据源的数据统一起来,并将数据规范化为你选择的目的地,清理和大众化你的所有数据,用于各种高级下游分析和可视化用例。

虽然Logstash最初推动了日志收集方面的创新,但是它的功能远远超出了这个用例,任何类型的事件都可以通过大量的输入、过滤器和输出插件来丰富和转换,使用许多原生编解码可以进一步简化摄取过程。Logstash通过利用大量和多种数据来提高你的洞察力。

简单来说,logstash 是一个很牛逼的开源数据收集引擎,并能将日志繁乱的格式字段定义为你想要的。

Hillstone 配置

系统版本:Version 5.5
路径:监控-日志-日志管理-你想搜集的日志-日志服务器-日志分发方式-明文日志(注意一定是明文,默认为二进制日志,logstash拿到后不好处理)
image

由于可选日志条目较多,我们只拿会话日志进行举例,我们勾选让会话日志先传递给logstash。
image

Logstash配置

  1. 新建一个test.conf
1
2
3
4
5
6
7
8
9
[root@localhost config]# vim test-hillstone.conf
input{
udp {port => 5010 type => "Hillstone"}
}


output {
stdout { codec=> rubydebug }
}
  1. 放通防火墙端口,保证防火墙和Logstash的端口连通性,为方便起见,可以先暂时停用Linux的防火墙。具体停用方法可以百度。

  2. 我们执行一下配置文件(由于Logstash默认的日志文件只能执行一个项,所以需要重新指定路径。)

1
[root@localhost logstash-6.2.2]# ./bin/logstash -f config/test-hillstone.conf   --path.data data/hillstone-log --path.logs logs/hillstone-log
  1. 我们可以看到,大多数日志都是这两种格式,我们先按照这种格式进行分析
    image

  2. 利用logstash内置的正则格式进行日志分析,通过Grok Debugger进行日志切割。(到目前的经验来看,凡是grokdebug分析不出来的,logstash肯定分析不出来,而且grokdebug速度快,效率高,建议多尝试)

1
2
3
4
eg1:
<190>May 10 11:08:48 2812242182000133(root) 44243630 Traffic@FLOW: SESSION: 10.2.20.232:53550->140.143.254.155:443(TCP), application HTTPS, interface ethernet0/1.10, vr trust-vr, policy 10, user -@-, host -, mac 0000.0000.0000, send packets 3, send bytes 176, receive packets 2, receive bytes 120, start time 2019-05-10 11:08:47, close time 2019-05-10 11:08:48, session end, Block\n\u0000

\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), application\ %{DATA:app}\, interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, send packets %{BASE10NUM:sendPackets}\, send bytes %{BASE10NUM:sendBytes}\, receive packets %{BASE10NUM:receivePackets}\, receive bytes %{BASE10NUM:receiveBytes}\, start time %{TIMESTAMP_ISO8601:startTime}\, close time %{TIMESTAMP_ISO8601:closeTime}\, %{GREEDYDATA:reason}

image
我们将变量都定义出来,将日志定义成自己想要的,后面可以根据字段进行删减。

  1. 好了,废话不多说,大体思路是这样。下面我将.conf配置文件贴出来,有问题可以多沟通。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
[root@localhost config]# cat hillstone.conf
input{
udp {port => 5010 type => "Hillstone"}
# udp {port => 5003 type => "H3C"}
}

# kv{ }

#input { stdin { } }
filter {
grok {
##流量日志
#PBR修改后语句
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: PBR\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), app %{DATA:app}\, in-interface\ %{DATA:in-interface}\, vr\ %{DATA:vr}\, pbr-policy\ %{DATA:pbrpolicy}\, pbr id\ %{DATA:pbrID}\, out-interface\ %{DATA:out-interface}\, nexthop\ %{IPV4:nexthop}, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, %{GREEDYDATA:reason}"}

#SESSION会话结束日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), application\ %{DATA:app}\, interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, send packets %{BASE10NUM:sendPackets}\, send bytes %{BASE10NUM:sendBytes}\, receive packets %{BASE10NUM:receivePackets}\, receive bytes %{BASE10NUM:receiveBytes}\, start time %{TIMESTAMP_ISO8601:startTime}\, close time %{TIMESTAMP_ISO8601:closeTime}\, %{GREEDYDATA:reason}"}
#SESSION会话开始日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), interface %{DATA:interface}\, vr %{DATA:vr}\, policy %{DATA:policy}\, user %{USERNAME:user}\@%{DATA:AAAserver}\, host %{DATA:HOST}\, mac %{CISCOMAC:mac}\, %{GREEDYDATA:reason}"}

#SNAT日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), snat to %{IPV4:snatip}\:%{BASE10NUM:snatport}\, vr\ %{DATA:vr}\, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host\ %{DATA:HOST}\, rule\ %{BASE10NUM:rule}"}

#DNAT日志

match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}\: %{IPV4:srcip}\:%{BASE10NUM:srcport}->%{IPV4:dstip}:%{WORD:dstport}\(%{DATA:protocol}\), dnat to %{IPV4:dnatip}\:%{BASE10NUM:dnatport}\, vr\ %{DATA:vr}\, user\ %{USERNAME:user}\@%{DATA:AAAserver}\, host\ %{DATA:HOST}\, rule\ %{BASE10NUM:rule}"}

##内容审计日志
#URL日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Event}@%{DATA:Class}\: %{DATA:module}: IP %{IPV4:srcip}\:%{BASE10NUM:srcport}\(%{IPV4:snatip}\:%{BASE10NUM:snatport}\)->%{IPV4:dstip}\:%{BASE10NUM:dstport}\(%{IPV4:dnatip}\:%{BASE10NUM:dnatport}\), user %{USERNAME:user}, VR %{DATA:vr}\, URL\ %{DATA:url}\, category\ %{DATA:category}\, method\ %{DATA:method}\, action\ %{DATA:action}\, reason\ %{GREEDYDATA:reason}"}

#IM日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{DATA:module}: IP %{IPV4:srcip}\:%{BASE10NUM:srcport}\(%{IPV4:snatip}\:%{BASE10NUM:snatport}\)->%{IPV4:dstip}\:%{BASE10NUM:dstport}\(%{IPV4:dnatip}\:%{BASE10NUM:dnatport}\), user %{USERNAME:user}\@%{DATA:AAAserver}\, VR %{DATA:vr}\, %{DATA:app}\, %{WORD: content}\, %{DATA:state}\, user mac\ %{BASE10NUM:mac}"}

##系统日志
match => { "message" => "\<%{BASE10NUM:syslog_pri}\>%{SYSLOGTIMESTAMP:timestamp}\ %{BASE10NUM:serial}\(%{WORD:ROOT}\) %{DATA:logid}\ %{DATA:Sort}@%{DATA:Class}\: %{GREEDYDATA:reason}"}


}

# if [message] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\."
# {
# geoip {
# source => "dstip"
# target => "geoip"
# database => "/opt/elk/logstash-6.2.2/geoip/GeoLite2-City.mmdb"
# fields => ["country_name", "continent_code", "region_name", "city_name", "location", "latitude", "longitude"]
# remove_field => ["[geoip][latitude]", "[geoip][longitude]"]
# }
#
# geoip {
# source => "dstip"
# target => "geoip"
# database => "/opt/elk/logstash-6.2.2/geoip/GeoLite2-ASN.mmdb"
# remove_field => ["[geoip][ip]"]
# }
# }




mutate {
# add_field => ["logTimestamp", "%{date} %{time}"]
# remove_field => ["logTimestamp", "year", "month", "day", "time", "date"]
remove_field => ["type", "host", "message", "ROOT", "HOST", "serial", "syslog_pri", "timestamp", "mac"]
}


}
output {
elasticsearch {
hosts => "10.2.3.193:9200" #elasticsearch服务地址
index => "logstash-hillstone-%{+YYYY.MM.dd}"
}
# stdout { codec=> rubydebug }
}