大数据之恶意域名监测

故事背景

传说中的一天,在听说了某公司的内网员工由于访问了恶意网站,导致机器中毒后,领导过来问了下。
领导:能知道我们的内网用户访问是安全的么?
我的内心旁白: 作为一个资深的工程师,当然不能说不能呀。
我:当然可以啊,只要把所有用户访问过一下安全检查,就知道了呀。
领导:嗯,那就你来弄这个事情吧。能发现出来,发个微信报警,顺便存下档方便以后分析。这样就行了。
我的内心旁白:我只是吹了个牛呀,用什么数据去检测呢,每天用户访问量那么大,该怎么弄啊。

我(想了下刚才吹的牛):好的领导。这会是一个非常有意义的项目。

架构简述

经过若干时间的思考,我打算这样做。架构简单粗暴,就是找安全组要了一个能监测恶意域名的接口,
然后把用户访问的域名都过一下这个接口。异常的信息发一份到微信,发一份到es。完美!!!

环境搭建

说干就干,首先你需要一台服务器。,然后你需要安装配置若干软件。

# cat /etc/redhat-release
CentOS Linux release 7.6.1810 (Core)
CPU:Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz
memory:32G
disk: 500G

packetbeat

安装,

sudo yum install libpcap
sudo rpm -vi packetbeat-7.0.0-x86_64.rpm

主要配置

packetbeat 这里只要注意下 ,不支持同时输出到es和logstash,要是有多输出需求,起多个实例。

# 网络接口配置一把。
vim /etc/packetbeat/packetbeat.yml
packetbeat.interfaces.device: em2
packetbeat.interfaces.snaplen: 1514
packetbeat.interfaces.type: af_packet
packetbeat.interfaces.buffer_size_mb: 2048
# 其它的注释掉,只留这个。
- type: dns
    # Configure the ports where to listen for DNS traffic. You can disable
    # the DNS protocol by commenting out the list of ports.
    ports: [53]

# 输出到Logstash
 #----------------------------- Logstash output --------------------------------
output.logstash:
    # The Logstash hosts
    hosts: ["localhost:5044"]

logstash

安装

rpm -ivh jdk-8u202-linux-x64.rpm # java环境变量配置一下。
rpm -ivh logstash-7.0.0.rpm

主要配置

# output 部分是对数据的筛减,后面会详解下。
vim /etc/logstash/conf.d/logstash.conf
input {
    beats {
    port => 5044
    }
}
output {
    if [client_ip] not in ['10.100.2.2','10.100.2.1','10.100.2.3']{
        if [resource] not in ['www.baidu.com.','www.baidu2.com.','www.baidu3.com.']{
        kafka {
            codec => json
            topic_id => "mytopic"
            }
        }
    }
}

logstash 优化和监控

# 这个地方不用多,2G够用,后面的监控可以看到
vim /etc/logstash/jvm.options
-Xms2g
-Xmx2g
# pipeline.workers 参考CPU核心数,pipeline.batch.size 可以参考监控调试一下。
vim /etc/logstash/logstash.yml
# pipeline.workers: 2
pipeline.workers: 6
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
pipeline.batch.size: 2048
# 新版本福利,可见即所得。
# X-Pack Monitoring
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://127.0.0.1:9200",]

数据筛减

先让我们来看看  这张图。可以看到logstash 每秒 input output 7k+的数据。
我心里大概想了下,要是每秒钟进账7k 人名币,美滋滋。

查看了12小时内top:5的用户ip,一看都是服务器的IP,而且访问量还这么大,就用logstash把你给排除掉吧。

查看了12小时内top:5的用户访问的域名,对于可信任的域名也给排除掉。

可以看到 经过删减后,到kafka的数据 大概在1.54K/s

kafka

kafka 是个神器,应该多多学习它。

安装

# wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz # kafka-python 支持列表kafka1.1

> tar -xzf kafka_2.12-2.2.0.tgz
# mv kafka_2.12-2.2.0 /usr/local/kafka

启动服务

# Kafka使用ZooKeeper,因此如果您还没有ZooKeeper服务器,则需要先启动它
# bin/zookeeper-server-start.sh config/zookeeper.properties
# bin/kafka-server-start.sh config/server.properties

后台运行

nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zookeeper-server.log 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka-server.log 2>&1 &

ss -ntlp  看到9092,2181 端口就表示 启动完成了。

kafka-eagle

kafka-eagle 是kfka的监控工具,推荐新手使用,图形化显示,挺好用的。

在topic-list中我们就能看到Logstash 中指定的topic

Consumers

logstash 就是我们kafka的 producer,
我们用Python来做consumer.  推荐使用下面的方式 安装kafka-python,

git clone https://github.com/dpkp/kafka-python
pip install ./kafka-python

这个地方算是个难点, python的线程池了解一下。
解释下,在这个案例中 瓶颈在 网络I/O时间,多线程是比较合适的。
如果是密集计算任务,多进程是比较合适的。
python脚本会持续优化更新,有兴趣的小伙伴加QQ群 一起讨论:752774493


\#!/usr/bin/env python # -*- coding: utf-8 -*- import json import logging import requests from datetime import datetime from kafka import KafkaConsumer from elasticsearch import Elasticsearch from concurrent.futures import ThreadPoolExecutor from wechat import sendwechat logging.basicConfig(level=logging.INFO, filename='/var/log/security-check/dns.log') logging.basicConfig(level=logging.INFO) logger = logging.getLogger('dns') def check_dns(message): value = json.loads(message.value.decode('utf-8')) resource = value['resource'][:-1] client_ip = value['client_ip'] now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S") url = "http://127.0.0.1:8080/dns" querystring = {"token": "TOKEN", "param": resource} try: response = requests.request("GET", url, params=querystring) if response.json()['code'] == 0: body = response.json()['result'][0]['data'][0] body['@timestamp'] = now body['client_ip'] = client_ip index = 'security_check_url-{}'.format(datetime.now().strftime("%Y.%m.%d")) es = Elasticsearch("http://127.0.0.1", http_auth=('admin', 'admin')) result = es.index(index=index, doc_type="doc", body=body) sendwechat(jobnumber, client_ip, resource) logger.info(result) except Exception as e: logger.error('this is a error log {}'.format(e)) def security_check_dns(): executor = ThreadPoolExecutor(max_workers=20) try: consumer = KafkaConsumer('mytopic', bootstrap_servers=['127.0.0.1:9092'], group_id='MY_GROUP1', consumer_timeout_ms=1000) for message in consumer: executor.submit(check_dns, message) except Exception as e: logger.error('this is a error log {}'.format(e)) if __name__ == "__main__": security_check_dns()

查看Consumers

kafka 真是个好东西。谁用谁知道。

查看效果

你要是发现某个IP一直在报,那就要主动去了解下。查杀下电脑了。

查看比较详细的信息,kibana 当仁不让啊。

结束语

这是一个最好的时代,也是一个最坏的时代;
这是一个智慧的年代,这是一个愚蠢的年代;
这是一个信任的时期,这是一个怀疑的时期;
这是一个光明的季节,这是一个黑暗的季节;

 

企业的内网安全,是每个IT人永恒的追求。

大数据,可视化,这些利器让以前一些很难完成的事情,现在变得相对容易起来。