Motivation
Dashboard
ElasticSearch Install
$ brew install elasticsearch
$ elasticsearch -v
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72
Download Kibana
$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz
Python Library
$ pip install pyshark elasticsearch requests
Execute
$ python packet_cap_es.py <interface>
Python Script
"""
This app captures packets and extract five tupels.
Store these data to elastic search.
Elastic search and kibana creates real time packet monitering
bashbord.
"""
import json
import sys
import datetime
import time
import pyshark
import requests
from elasticsearch import Elasticsearch
from elasticsearch import helpers
URL = "http://localhost:9200"
INDEX_URL = URL + "/packets"
TYPE_URL = INDEX_URL + "/packet"
ACTION = {"_index" : "packets",
"_type" : "packet",
"_source": {}
}
def delete_index():
"""Delete an index in elastic search."""
requests.delete(INDEX_URL)
def create_index():
"""Create an index in elastic search with timestamp enabled."""
requests.put(INDEX_URL)
setting = {"packet" : {
"_timestamp" : {
"enabled" : True,
"path" : "capture_timestamp",
},
"numeric_detection" : False,
"properties" : {
"dstip" : { "type":"string",
"index" : "not_analyzed",
"store" : True},
"srcip" : { "type":"string",
"index" : "not_analyzed",
"store" : True}
}
}}
for _ in range(1, 100):
try:
r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
break
except:
time.sleep(1)
pass
def main():
"""Extract packets and store them to ES"""
capture = pyshark.LiveCapture(interface=sys.argv[1])
packet_que = list()
es = Elasticsearch()
end_time = None
for packet in capture.sniff_continuously():
if packet.transport_layer in ("UDP", "TCP"):
try:
# Why does ES add 9 hours automatically?
localtime = float(packet.sniff_timestamp) - 60 * 60 * 9 # GMT + 9
row_timestamp = datetime.datetime.fromtimestamp(localtime)
timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
version = int(packet[1].version)
# ip v6 does not have protocol. It has next header instead.
if version == 4:
protocol = int(packet[1].proto)
elif version == 6:
protocol = int(packet[1].nxt)
else:
protocol = None
dstip = packet[1].dst
srcip = packet[1].src
dstport = int(packet[2].dstport)
srcport = int(packet[2].srcport)
parsed_packet = dict(version=version, protocol=protocol,
dstip=dstip, srcip=srcip,
dstport=dstport, srcport=srcport,
capture_timestamp=timestamp)
# For historical graph
parsed_packet["@timestamp"] = timestamp
action = ACTION.copy()
action["_source"].update(parsed_packet)
packet_que.append(action)
current = time.time()
while(end_time is None or current - end_time >= 3):
helpers.bulk(es, packet_que)
del packet_que[0:len(packet_que)]
end_time = time.time()
break
except Exception as e:
time.sleep(1)
if __name__ == "__main__":
if len(sys.argv) != 2:
print >>sys.stderr, "python packet_cap_es.py <interface>"
exit(1)
delete_index()
create_index()
main()
Recommended Posts