故事背景
又是一个吃着火锅唱着歌的日子,同事A过来吐槽说,某某交换机的风扇坏了,要不是今天过去例行巡检设备,发现设备声音不对劲还发现不了这个问题。
作为一名资深网工其实你深深的知道这种报错,设备的日志会有体现的,而且公司是有一台日志服务器滴,但是由于开打速度慢,查询速度慢,几乎没有分析展示功能,也没有报警功能。大家几乎很少去用它。
这个时候领导过来说: 我们要做一种日志分析系统,查询速度飞快,分析界面高端大气上档次,低调奢华有内涵,最重要是能主动发微信报警,要是以前听到这样的需求一般都是 下面的表情:
但是我们今天就是要好好聊聊这个日志分析
架构简述
日志系统首先要面临几个问题:
不同厂家设备的不同日志格式的处理,如何调用微信来发报警信息。
采用的解决办法是 不同厂商的设备发送日志的时候采用不同的端口,
日志先发送到logstash, logstash会先解析日志成标准格式,然后logstash会做2件事情,一个是存放日志到es里面,通过kibana做出展示。
另一个是发送符合条件的日志到python脚本,python脚本中调用微信接口发送报警信息。
环境搭建
Elk的搭建就省掉了 这不是今天讲的重点,推荐先百度,要是有疑问可以加QQ群:752774493
交换机配置
其实交换机的配置是一个痛点,几台设备手动配置一下也就算了,要是你的环境中有几百台,要是还没有统一配置工具 直接配到你怀疑人生啊。
cisco:
logging host 10.100.18.18 transport udp port 5002
H3C
info-center enable
info-center source default channel 2 trap state off
// 必要,不然日志会出现 不符合级别的 alert 日志
info-center loghost 10.100.18.18 port 5003
huawei
info-center enable
info-center loghost 10.100.18.18
info-center timestamp log short-date
info-center timestamp trap short-date
你要是熟悉ruby语言的话,我是推荐你使用oxidized系统来做统一配置的。
我是比较熟悉python,就用python的netmiko库写了一个脚本,用了多线程 速度还是挺快的,有一种型号的交换机200多台,也是数秒中刷完配置。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from
netmiko
import ConnectHandler
from
openpyxl
import load_workbook
import
threading
def
conf_syslog(ip):
cisco_881 = {
‘device_type’: ‘cisco_ios’,
‘ip’: ip,
‘username’: ‘admin’,
‘password’: ‘admin@123’,
}
net_connect = ConnectHandler(**cisco_881)
commands = [‘logging on’,
‘logging host 10.100.18.18 transport udp port 5002’,
‘end’,
‘write memory’]
output = net_connect.send_config_set(commands)
return output
def
get_host():
wb = load_workbook(filename=‘hosts.xlsx’)
sheetnames = wb.sheetnames
ws = wb[sheetnames[0]]
for i in
range(2, ws.max_row +
1):
ip = ws.cell(row=i, column=1).value
res = threading.Thread(target=conf_syslog, args=(ip,))
res.start()
return res
print(get_host())
上面是思科设备的脚本,H3C, 华为的脚本 ,参考netmiko案例写就行了。
配置
设置logstash用root账户启动
我这里有华为的5720,和2750型号的设备,发现华为的设备在指定syslog的时候没有办法自定义端口,就只好用默认的端口514,就会产生一个问题,1024以下的端口需要root用户才能用。就简单暴力的使用root启动logstash
[root@xdl-18-18 ~]# vim /etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
Type=simple
User=root
Group=root
Logstash 的配置:
不同厂商的日志 gork我都写好了,复制过去就能用。
input{
tcp {port =>
5002 type =>
"Cisco"}
udp {port =>
514 type =>
"HUAWEI"}
udp {port =>
5002 type =>
"Cisco"}
udp {port =>
5003 type =>
"H3C"}
}
filter {
if [type] ==
"Cisco"{
grok{
match => { "message"
=>
"<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: .%{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
match => { "message"
=>
"<%{BASE10NUM:syslog_pri}>%{NUMBER:log_sequence}: %{SYSLOGTIMESTAMP:timestamp}: %%{DATA:facility}-%{POSINT:severity}-%{CISCO_REASON:mnemonic}: %{GREEDYDATA:message}" }
add_field => {"severity_code"
=>
"%{severity}"}
overwrite => ["message"]
}
}
else
if [type] ==
"H3C"{
grok {
match => { "message"
=>
"<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{YEAR:year} %{DATA:hostname} %%%{DATA:vvmodule}/%{POSINT:severity}/%{DATA:digest}: %{GREEDYDATA:message}" }
remove_field => [ "year" ]
add_field => {"severity_code"
=>
"%{severity}"}
overwrite => ["message"]
}
}
else
if [type] ==
"HUAWEI"{
grok {
match => { "message"
=>
"<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %%%{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
match => { "message"
=>
"<%{BASE10NUM:syslog_pri}>%{SYSLOGTIMESTAMP:timestamp} %{DATA:hostname} %{DATA:ddModuleName}/%{POSINT:severity}/%{DATA:Brief}:%{GREEDYDATA:message}"}
remove_field => [ "timestamp" ]
add_field => {"severity_code"
=>
"%{severity}"}
overwrite => ["message"]
}
}
mutate {
gsub => [
"severity", "0", "Emergency",
"severity", "1", "Alert",
"severity", "2", "Critical",
"severity", "3", "Error",
"severity", "4", "Warning",
"severity", "5", "Notice",
"severity", "6", "Informational",
"severity", "7", "Debug"
]
}
}
output{
elasticsearch {
index =>
"syslog-%{+YYYY.MM.dd}"
hosts => ["your_ipaddress:9200"]
}
}
日志分析
以下是我自己平时想看的数据,你要是有其它需求 可以一起交流呢。
日志量top 10的设备
设备产生日志量很大,需要关注一下呢
日志分类的占比
不同级别的日志 一目了然,
日志分类数量占比
可以在一张图上看到日志级别和占比最多的主机
每小时的日志量
讲道理 日志量会有一定的规律 如果有异常高的日志量的时候 需要关注一下
Dashboard
创建一个Dashboard 在一个页面中完成日志的展示,查询,ELK的查询速度快到飞起。
发送报警
讲道理 日志系统查询再快,分析再好,要是不能发送报警都是耍流氓。
Logstash 配置
我的设计是 日志等级在2以上的 发送微信 通知管理员。
mutate {
gsub => [
"severity", "0", "Emergency",
"severity", "1", "Alert",
"severity", "2", "Critical",
"severity", "3", "Error",
"severity", "4", "Warning",
"severity", "5", "Notice",
"severity", "6", "Informational",
"severity", "7", "Debug"
]
convert => ["severity_code", "integer"]
}
}
output{
if [severity_code] <=
2{
stdout { codec => rubydebug }
exec {
command =>
"python /usr/local/script/syslog.py \"%{[host]}\"
\"%{[severity]}\"
\"%{[message]}\" "
}
}
elasticsearch {
index =>
"syslog-%{+YYYY.MM.dd}"
hosts => ["your_ipaddress:9200"]
}
}
发送报警的Python脚本
我这里有专门的部门提供 写好了的微信接口,我是直接调用。
如果你的环境中没有微信接口 我推荐你使用邮件来发送报警信息。
在oxidized的讲解文章中 有关于发送邮件的部分 可以参考。
# 调用微信接口
def sendwechat(jobnumbers, host, severity, message):
send_wechat = SendWechat(jobnumbers, host, severity, message)
send_wechat.start()
host = sys.argv[1]
print(‘host=‘, host)
severity = sys.argv[2]
print(‘severity=‘, severity)
msg = sys.argv[3:]
message =
” “.join(msg)
print(‘message=‘, message)
jobnumbers =
“30564”
sendwechat(jobnumbers, host, severity, message)
效果演示
还是及时的微信通知 让人安心呢。
结束语
作为一个网工,网络设备的日志,备份其实是一直困扰我的问题。值得欣喜的是Oxidized系统的出现,完美的解决了交换机等网络设备备份的问题, 日志分析报警也伴随这ELK和python的加持,变的更加方便实用,给网工们带来真正的利好。
下一篇 文章我们再来谈谈流量分析的问题,敬请期待。