Jak zbudować pulpit nawigacyjny ruchu sieciowego w czasie rzeczywistym za pomocą języka Python i Streamlit
Czy kiedykolwiek chciałeś wizualizować ruch sieciowy w czasie rzeczywistym? W tym samouczku dowiesz się, jak zbudować interaktywny pulpit do analizy ruchu sieciowego za pomocą Pythona i Streamlit
. Streamlit
to framework w języku Python typu open source, którego można używać do tworzenia aplikacji internetowych do analizy i przetwarzania danych.
Pod koniec tego samouczka będziesz wiedział, jak przechwytywać surowe pakiety sieciowe z NIC (karta interfejsu sieciowego) komputera, przetwarzać dane i tworzyć piękne wizualizacje, które będą aktualizować w czasie rzeczywistym.
Spis treści
Dlaczego analiza ruchu sieciowego jest ważna?
Warunki wstępne
-
Jak skonfigurować swój projekt
Jak budować podstawowe funkcje
Jak stworzyć rozładowane wizualizacje
Jak przechwytywać pakiety sieciowe
Składanie wszystkiego razem
Przyszłe ulepszenia
Wniosek
Dlaczego analiza ruchu sieciowego jest ważna?
Analiza ruchu sieciowego jest krytycznym wymogiem w przedsiębiorstwach, w których sieci stanowią szkielet niemal każdej aplikacji i usługi. U podstaw leży analiza pakietów sieciowych, która obejmuje monitorowanie sieci, przechwytywanie całego ruchu (przychodzącego i wychodzącego) oraz interpretację tych pakietów podczas ich przepływu przez sieć. Techniki tej można używać do identyfikowania wzorców zabezpieczeń, wykrywania anomalii oraz zapewniania bezpieczeństwa i wydajności sieci.
Ten projekt dowodu koncepcji, nad którym będziemy pracować w tym samouczku, jest szczególnie przydatny, ponieważ pomaga wizualizować i analizować aktywność sieci w czasie rzeczywistym. To pozwoli ci zrozumieć, w jaki sposób rozwiązywanie problemów, optymalizacje wydajności i analiza bezpieczeństwa odbywają się w systemach korporacyjnych.
Wymagania wstępne
Python 3.8 lub nowsza wersja zainstalowana w Twoim systemie.
-
Podstawowa znajomość koncepcji sieci komputerowych.
Znajomość języka programowania Pythona i jego powszechnie używanych bibliotek.
Podstawowa znajomość technik i bibliotek wizualizacji danych.
Jak skonfigurować swój projekt
Aby rozpocząć, utwórz strukturę projektu i zainstaluj niezbędne narzędzia z PIP za pomocą następujących poleceń:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
Będziemy używać Streamlit
do wizualizacji dashboardu, Pandas
do przetwarzania danych, Scapy
do przechwytywania i przetwarzania pakietów sieciowych, a na końcu Plotly
do tworzenia wykresów z wykorzystaniem naszych zebranych danych.
Jak zbudować podstawowe funkcjonalności
Cały kod umieścimy w jednym pliku o nazwie dashboard.py
. Na początek zacznijmy od zaimportowania wszystkich elementów, których będziemy używać:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
Teraz skonfigurujmy rejestrowanie, konfigurując podstawową konfigurację rejestrowania. Zostanie to wykorzystane do śledzenia zdarzeń i uruchamiania naszej aplikacji w trybie debugowania. Obecnie ustawiliśmy poziom rejestrowania na Informacje , co oznacza, że wyświetlane zostaną zdarzenia z poziomem lub wyżej. Jeśli nie znasz logowania w Pythonie, polecam sprawdzić ten fragment dokumentacji, który jest dogłębny.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
Następnie zbudujemy nasz procesor pakietów. W tej klasie zaimplementujemy funkcjonalność przetwarzania naszych przechwyconych pakietów.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Add TCP-specific information
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Add UDP-specific information
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Keep only last 10000 packets to prevent memory issues
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
Ta klasa zbuduje naszą podstawową funkcjonalność i ma kilka funkcji użyteczności, które będą używane do przetwarzania pakietów.
Pakiety sieciowe są podzielone na dwa na poziomie transportu (TCP i UDP) oraz protokół ICMP na poziomie sieci. Jeśli nie znasz koncepcji TCP/IP, polecam sprawdzić ten artykuł na temat Freecodecamp News.
Nasz konstruktor będzie śledzić wszystkie widziane pakiety, które są podzielone na te wiadra typu TCP/IP, które zdefiniowaliśmy. Zanotujemy również czas przechwytywania pakietów, przechwycone dane i liczbę przechwyconych pakietów.
Będziemy również wykorzystać blokadę wątków, aby upewnić się, że tylko jeden pakiet jest przetwarzany w jednym czasie. Można to dalej rozszerzyć, aby projekt miał równoległe przetwarzanie pakietów.
Funkcja Inne (
.
Funkcja process_packet
obsługuje naszą podstawową funkcjonalność, która przetwarza te indywidualne pakiety. Jeśli pakiet zawiera warstwę IP, zanotuje źródłowy i docelowy adres IP, typ protokołu, rozmiar pakietu i czas, jaki upłynął od rozpoczęcia przechwytywania pakietu.
W przypadku pakietów z określonymi protokołami warstwy transportowej (takimi jak TCP i UDP) przechwycimy port źródłowy i docelowy wraz z flagami TCP dla pakietów TCP. Wyodrębnione szczegóły zostaną zapisane w pamięci na liście packet_data
. Będziemy także śledzić packet_count
, kiedy i kiedy te pakiety są przetwarzane.
Funkcja pand
ramka danych, która będzie następnie używana do naszej wizualizacji.
Jak stworzyć rozładowane wizualizacje
Teraz nadszedł czas, abyśmy zbudowali nasz interaktywny Streamlit Dashboard. Zdefiniujemy funkcję o nazwie create_visualization
w skrypcie dashboard.py
(poza naszą klasą przetwarzania pakietów).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Protocol distribution
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Packets timeline
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Top source IPs
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
Ta funkcja pobierze ramkę danych jako dane wejściowe i pomoże nam wykreślić trzy wykresy/wykresy:
Wykres dystrybucji protokołów: Ten wykres przedstawia proporcje różnych protokołów (na przykład TCP, UDP, ICMP) w przechwyconym ruchu pakietowym.
Tabela osi czasu pakietów: Ten wykres pokaże liczbę pakietów przetwarzanych na sekundę w okresie czasowym.
Najlepsze źródło adresów IP: Ten wykres wyróżni 10 najlepszych adresów IP, które wysyłały najwięcej pakietów w przechwyconym ruchu.
Wykres dystrybucji protokołu jest po prostu wykresem kołowym liczby protokołu dla trzech różnych typów (wraz z innymi). Używamy SPREASTIL
i
W przypadku drugiego wykresu wykonamy operację na danych i otrzymamy liczbę pakietów przechwyconych w każdej sekundzie ( s
stoi na sekundy), a potem wreszcie będziemy Wykonaj wykres.
Wreszcie, dla trzeciego wykresu, policzymy odrębne źródło obserwowane IPS, a wykres wykresu IP liczy się, aby pokazać 10 najlepszych IP.
Jak przechwytywać pakiety sieciowe
Teraz zbudujmy funkcjonalność, która pozwoli nam przechwytywać dane pakietów sieciowych.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
Jest to prosta funkcja, która tworzy instancję klasy PacketProcessor
, a następnie używa funkcji sniff
w module scapy
w celu rozpoczęcia przechwytywania pakietów.
Używamy tutaj wątków, aby umożliwić nam przechwytywanie pakietów niezależnie od głównego przepływu programu. Dzięki temu operacja przechwytywania pakietów nie blokuje innych operacji, takich jak aktualizacja pulpitu nawigacyjnego w czasie rzeczywistym. Zwracamy także utworzoną instancję PacketProcessor
, aby można ją było wykorzystać w naszym głównym programie.
Składając wszystko razem
Teraz połączmy wszystkie te elementy razem za pomocą naszej funkcji main
, która będzie działać jako funkcja sterownika naszego programu.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Initialize packet processor in session state
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Create dashboard layout
col1, col2 = st.columns(2)
# Get current data
df = st.session_state.processor.get_dataframe()
# Display metrics
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Display visualizations
create_visualizations(df)
# Display recent packets
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Add refresh button
if st.button('Refresh Data'):
st.rerun()
# Auto refresh
time.sleep(2)
st.rerun()
Ta funkcja utworzy także pulpit nawigacyjny Streamlit
i zintegruje ze sobą wszystkie nasze komponenty. Najpierw ustawiamy tytuł strony naszego panelu Streamlit
, a następnie inicjujemy nasz PacketProcessor
. Używamy stanu sesji w Streamlit
, aby mieć pewność, że utworzona zostanie tylko jedna instancja przechwytywania pakietów, a jej stan zostanie zachowany.
Teraz dynamicznie otrzymamy ramkę danych ze stanu sesji za każdym razem, gdy dane są przetwarzane i zaczniemy wyświetlać wskaźniki i wizualizacje. Będziemy również wyświetlać niedawno przechwycone pakiety wraz z informacjami takimi jak znacznik czasu, źródło i docelowe IP, protokół i rozmiar pakietu. Dodamy również możliwość ręcznego odświeżenia danych z deski rozdzielczej, a także automatycznie odświeżamy je co dwie sekundy.
Wreszcie uruchommy program z następującym poleceniem:
sudo streamlit run dashboard.py
Pamiętaj, że będziesz musiał uruchomić program za pomocą sudo
, ponieważ możliwości przechwytywania pakietów wymagają uprawnień administratora. Jeśli korzystasz z systemu Windows, otwórz terminal jako Administrator, a następnie uruchom program bez przedrostka sudo
.
Daj mu chwilę, aby program rozpoczął przechwytywanie pakietów. Jeśli wszystko pójdzie dobrze, powinieneś zobaczyć coś takiego:
To są wszystkie wizualizacje, które właśnie zaimplementowaliśmy w naszym programie dashboardów Streamlit
.
Przyszłe ulepszenia
Dzięki temu oto kilka przyszłych pomysłów na ulepszenie, których możesz użyć, aby rozszerzyć funkcje pulpitu nawigacyjnego:
Dodaj możliwości uczenia maszynowego na potrzeby wykrywania anomalii
Wdrażaj geograficzne mapowanie adresów IP
Twórz niestandardowe alerty w oparciu o wzorce analizy ruchu
Dodaj opcje analizy ładunku pakietów
Wniosek
Gratulacje! Teraz z powodzeniem zbudowałeś pulpit analizy ruchu w czasie rzeczywistym z Pythonem i
Mam nadzieję, że dzięki temu nauczyłeś się podstaw analizy ruchu sieciowego i programowania w języku Python. Dziękuję za przeczytanie!