故事背景
传说中的一天,在听说了某公司的内网员工由于访问了恶意网站,导致机器中毒后,领导过来问了下。
领导:能知道我们的内网用户访问是安全的么?
我的内心旁白: 作为一个资深的工程师,当然不能说不能呀。
我:当然可以啊,只要把所有用户访问过一下安全检查,就知道了呀。
领导:嗯,那就你来弄这个事情吧。能发现出来,发个微信报警,顺便存下档方便以后分析。这样就行了。
我的内心旁白:我只是吹了个牛呀,用什么数据去检测呢,每天用户访问量那么大,该怎么弄啊。
我(想了下刚才吹的牛):好的领导。这会是一个非常有意义的项目。
架构简述
经过若干时间的思考,我打算这样做。架构简单粗暴,就是找安全组要了一个能监测恶意域名的接口,
然后把用户访问的域名都过一下这个接口。异常的信息发一份到微信,发一份到es。完美!!!
环境搭建
说干就干,首先你需要一台服务器。,然后你需要安装配置若干软件。
# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
CPU:Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz
memory:32G
disk: 500G
packetbeat
安装,
sudo yum install libpcap
sudo rpm -vi packetbeat-7.0.0-x86_64.rpm
主要配置
packetbeat 这里只要注意下 ,不支持同时输出到es和logstash,要是有多输出需求,起多个实例。
# 网络接口配置一把。
vim /etc/packetbeat/packetbeat.yml
packetbeat.interfaces.device: em2
packetbeat.interfaces.snaplen: 1514
packetbeat.interfaces.type: af_packet
packetbeat.interfaces.buffer_size_mb: 2048
# 其它的注释掉,只留这个。
- type: dns
# Configure the ports where to listen for DNS traffic. You can disable
# the DNS protocol by commenting out the list of ports.
ports: [53]
# 输出到Logstash
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["localhost:5044"]
logstash
安装
rpm -ivh jdk-8u202-linux-x64.rpm # java环境变量配置一下。
rpm -ivh logstash-7.0.0.rpm
主要配置
# output 部分是对数据的筛减,后面会详解下。
vim /etc/logstash/conf.d/logstash.conf
input {
beats {
port => 5044
}
}
output {
if [client_ip] not in ['10.100.2.2','10.100.2.1','10.100.2.3']{
if [resource] not in ['www.baidu.com.','www.baidu2.com.','www.baidu3.com.']{
kafka {
codec => json
topic_id => "mytopic"
}
}
}
}
logstash 优化和监控
# 这个地方不用多,2G够用,后面的监控可以看到
vim /etc/logstash/jvm.options
-Xms2g
-Xmx2g
# pipeline.workers 参考CPU核心数,pipeline.batch.size 可以参考监控调试一下。
vim /etc/logstash/logstash.yml
# pipeline.workers: 2
pipeline.workers: 6
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
pipeline.batch.size: 2048
# 新版本福利,可见即所得。
# X-Pack Monitoring
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://127.0.0.1:9200",]
数据筛减
先让我们来看看 这张图。可以看到logstash 每秒 input output 7k+的数据。
我心里大概想了下,要是每秒钟进账7k 人名币,美滋滋。
查看了12小时内top:5的用户ip,一看都是服务器的IP,而且访问量还这么大,就用logstash把你给排除掉吧。
查看了12小时内top:5的用户访问的域名,对于可信任的域名也给排除掉。
可以看到 经过删减后,到kafka的数据 大概在1.54K/s
kafka
kafka 是个神器,应该多多学习它。
安装
# wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz # kafka-python 支持列表kafka1.1
> tar -xzf kafka_2.12-2.2.0.tgz
# mv kafka_2.12-2.2.0 /usr/local/kafka
启动服务
# Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动它
# bin/zookeeper-server-start.sh config/zookeeper.properties
# bin/kafka-server-start.sh config/server.properties
后台运行
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-server.log 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka-server.log 2>&1 &
ss -ntlp 看到9092,2181 端口就表示 启动完成了。
kafka-eagle
kafka-eagle 是kfka的监控工具,推荐新手使用,图形化显示,挺好用的。
在topic-list中我们就能看到Logstash 中指定的topic
Consumers
logstash 就是我们kafka的 producer,
我们用Python来做consumer. 推荐使用下面的方式 安装kafka-python,
git clone https://github.com/dpkp/kafka-python
pip install ./kafka-python
这个地方算是个难点, python的线程池了解一下。
解释下,在这个案例中 瓶颈在 网络I/O时间,多线程是比较合适的。
如果是密集计算任务,多进程是比较合适的。
python脚本会持续优化更新,有兴趣的小伙伴加QQ群 一起讨论:752774493
\#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import logging
import requests
from datetime import datetime
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
from concurrent.futures import ThreadPoolExecutor
from wechat import sendwechat
logging.basicConfig(level=logging.INFO, filename='/var/log/security-check/dns.log')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('dns')
def check_dns(message):
value = json.loads(message.value.decode('utf-8'))
resource = value['resource'][:-1]
client_ip = value['client_ip']
now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
url = "http://127.0.0.1:8080/dns"
querystring = {"token": "TOKEN", "param": resource}
try:
response = requests.request("GET", url, params=querystring)
if response.json()['code'] == 0:
body = response.json()['result'][0]['data'][0]
body['@timestamp'] = now
body['client_ip'] = client_ip
index = 'security_check_url-{}'.format(datetime.now().strftime("%Y.%m.%d"))
es = Elasticsearch("http://127.0.0.1", http_auth=('admin', 'admin'))
result = es.index(index=index, doc_type="doc", body=body)
sendwechat(jobnumber, client_ip, resource)
logger.info(result)
except Exception as e:
logger.error('this is a error log {}'.format(e))
def security_check_dns():
executor = ThreadPoolExecutor(max_workers=20)
try:
consumer = KafkaConsumer('mytopic',
bootstrap_servers=['127.0.0.1:9092'],
group_id='MY_GROUP1',
consumer_timeout_ms=1000)
for message in consumer:
executor.submit(check_dns, message)
except Exception as e:
logger.error('this is a error log {}'.format(e))
if __name__ == "__main__":
security_check_dns()
查看Consumers
查看效果
你要是发现某个IP一直在报,那就要主动去了解下。查杀下电脑了。
结束语
这是一个最好的时代,也是一个最坏的时代;
这是一个智慧的年代,这是一个愚蠢的年代;
这是一个信任的时期,这是一个怀疑的时期;
这是一个光明的季节,这是一个黑暗的季节;
企业的内网安全,是每个IT人永恒的追求。
大数据,可视化,这些利器让以前一些很难完成的事情,现在变得相对容易起来。