可视化网络流量之ELK网络大数据分析

故事背景

有一天你在吃着火锅唱着歌,手机里面传来zabbix 接口流量超过阀值的报警。作为一名资深网工,你可是随身带着电脑,分分钟打开电脑想登陆防火墙查看流量,想看看到底是那个ZZ 做了什么导致了这个问题,因为你深深的知道已经做了用户流量的限额策略,理论上是不会出现这个问题。可是当你2分钟后登陆设备后发现 流量已经正常,这个时候你对着屏幕你陷入了沉思 要是有个流量记录,分析系统该多好啊

架构简述

想着这个系统最好是可迭代可升级,最重要的要开源免费。网络流量信息采集协议常见包括 sFlow 和 NetFlow,不同厂商的网络设备支持不同的协议 比如说常见思科系列支持netflow,华为系列支持sflow。一开始想到的方法是通过设备发送flow流量到netflow collector 。

读到一篇软文作者分析的很好。

Why nProbe+JSON+ZMQ instead of native sFlow/NetFlow support in ntopng?

文章提到nprobe 是个非常好的软件


后来发现了具有nProbe功能但是的免费的软件fprobe。

最后采用交换机端口镜像,fprobe把镜像端口流量转化为为Netflow 数据,并发送至ELK。

简介

ELK简介

ELK 是ElasticSearch、Logstash、Kibana三个应用的缩写。 ElasticSearch简称ES,主要用来存储和检索数据。Logstash主要用来往ES中写入数据。Kibana主要用来展示数据。

Fprobe简介

Fprobe 是一款在 FreeBSD下运行的软件,它可以将其接口收到的数据转化为Netflow 数据,并发送至Netflow 分析端

环境搭建

镜像服务器操作系统推荐使用ubuntu,测试下来Fprobe在centos7运行不是很好,

fprobe 安装

apt install libpcap-devel
apt install fprobe

ELK的系统选择centos7
logstash,ElasticSearch 都需要java环境 官方推荐Oracle JDK version 1.8.0_73

注意一下logstash 还不支持java 9

Logstash requires Java 8. Java 9 is not supported. Use the official Oracle distribution or an open-source distribution such as OpenJDK.

安装java

java我说下自己的安装套路,

先卸载系统自带的java

rpm -qa | grep java | xargs rpm -e --nodeps

然后去oracle官方下载
jdk-8u73-linux-x64.rpm
然后一条命令搞定,其它什么都不要设置

rpm -ivh jdk-8u73-linux-x64.rpm

[root@es-node1 ~]# java -version
java version "1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)
[root@es-node1 ~]# 

安装ELK

先去官网下载

logstash-5.2.1.rpm

elasticsearch-5.2.1.rpm

kibana-5.2.1-x86_64.rpm

然后一条命令搞定

rpm -ivh logstash-5.2.1.rpm elasticsearch-5.2.1.rpm kibana-5.2.1-x86_64.rpm

切记这个时候不要启动服务

配置

fprobe

首先配置fprobe 发送netflow 流量到ELK服务器9991端口
fprobe -i eno1 10.10.10.10:9991

logstash

在/etc/logstash/conf.d这个目录下面 新建一个netflow.conf 文件 配置如下:

[root@es-node1 ~]# cat /etc/logstash/conf.d/netflow.conf
input {
        udp {
                port => 9991
                type => "netflow_1"
                codec => netflow {
                        versions => [5, 9]
                }
        }
}
output {
        if[type] == "netflow_1"{
            elasticsearch {
                index => "logstash_netflow-%{+YYYY.MM.dd}"
                hosts => ["10.10.10.10:9200"]
            }
        }
}

elasticsearch

只要修改es配置文件中的 network.host 参数

[root@es-node1 ~]# vi /etc/elasticsearch/elasticsearch.yml

network.host: 10.10.10.10

添加索引模板

先启动elasticsearch服务

为了让采集到的数据的类型能被 elasticsearch 正确处理,添加如下索引模板,自动对所有 logstash_netflow- 开头的索引采取指定的类型解析

curl -XPUT http://localhost:9200/_template/logstash_netflow -d '{
    "template" : "logstash_netflow-*",
    "settings": {
      "refresh_interval" : "5s"
    },
    "mappings" : {
        "_default_" : {
           "_all" : {"enabled" : false},
           "properties" : {
              "@message":     { "index": "analyzed", "type": "string"  },
              "@source":      { "index": "not_analyzed", "type": "string"  },
              "@source_host": { "index": "not_analyzed", "type": "string" },
              "@source_path": { "index": "not_analyzed", "type": "string" },
              "@tags":        { "index": "not_analyzed", "type": "string" },
              "@timestamp":   { "index": "not_analyzed", "type": "date" },
              "@type":        { "index": "not_analyzed", "type": "string" },
              "netflow": {
                   "dynamic": true,
                   "type": "object",
                   "properties": {
                       "version": { "index": "analyzed", "type": "integer" },
                       "first_switched": { "index": "not_analyzed", "type": "date" },
                       "last_switched": { "index": "not_analyzed", "type": "date" },
                       "direction": { "index": "not_analyzed", "type": "integer" },
                       "flowset_id": { "index": "not_analyzed", "type": "integer" },
                       "flow_sampler_id": { "index": "not_analyzed", "type": "integer" },
                       "flow_seq_num": { "index": "not_analyzed", "type": "long" },
                       "src_tos": { "index": "not_analyzed", "type": "integer" },
                       "tcp_flags": { "index": "not_analyzed", "type": "integer" },
                       "protocol": { "index": "not_analyzed", "type": "integer" },
                       "ipv4_next_hop": { "index": "analyzed", "type": "ip" },
                       "in_bytes": { "index": "not_analyzed", "type": "long" },
                       "in_pkts": { "index": "not_analyzed", "type": "long" },
                       "out_bytes": { "index": "not_analyzed", "type": "long" },
                       "out_pkts": { "index": "not_analyzed", "type": "long" },
                       "input_snmp": { "index": "not_analyzed", "type": "long" },
                       "output_snmp": { "index": "not_analyzed", "type": "long" },
                       "ipv4_dst_addr": { "index": "analyzed", "type": "ip" },
                       "ipv4_src_addr": { "index": "analyzed", "type": "ip" },
                       "dst_mask": { "index": "analyzed", "type": "integer" },
                       "src_mask": { "index": "analyzed", "type": "integer" },
                       "dst_as": { "index": "analyzed", "type": "integer" },
                       "src_as": { "index": "analyzed", "type": "integer" },
                       "l4_dst_port": { "index": "not_analyzed", "type": "long" },
                       "l4_src_port": { "index": "not_analyzed", "type": "long" }
                   }
               }
            }
        }
   }
}'

kibana

只需要修改kibana配置文件中两个参数就好

[root@es-node1 ~]# vi /etc/kibana/kibana.yml
server.host: "0.0.0.0"
elasticsearch.url: "http://10.10.10.10:9200"

启动服务

systemctl restart logstash.service
systemctl restart elasticsearch.service
systemctl restart kibana.service

这个时候就可以通过 10.10.10.10:5601 来访问kibana了

这个时候会提示你配置第一个index
按照红色框子里面内容填和选

kibana 结果演示

下载 kibana 可视化模板的 json 文件,通过 kibana 界面直接导入即可看到效果。

然后可以很直观的看到 导致突发流量的源IP地址

你轻轻的点击一下鼠标,然后看到了源目IP地址和端口

对着屏幕你又陷入了沉思

最近有同事向你抱怨说访问网站慢 你查看出口流量也在合理范围 根据你的经验这很有可能不是你的锅。

但是公司的网络质量(延迟,抖动,丢包)
到底怎么样呢?没有办法拿出证据 让你很被动。

欲知后事如何 且听下回分解