Réalisation d'un sniffer

Publié le par Satellite

Bon, on va pas réinventer la roue, et c'est pas ce sniffer qui va remplacer Wireshark pour les exploits réseaux, mais ce que je veux vous montrer, c'est que si on vous laisse avec un ordinateur et juste Scapy/python, vous pouvez créer vos propres outils de sécurité informatique. Réalisons donc un sniffer.

Projet : Sniffer en console.
Informations fournies : IP sources / destination , Ports Source/destinations (pour TCP et UDP), flags (pour TCP)....

C'est parti ! Nous allons d'abord créer une fonction nous retournant les informations de la couche IP


class snifferBot:
    def __init__(self,filtre,host): #On s'occupera de cette méthode plus tard.
    def returnIP(self,paquet): #Retourne 99 si le paquet n 'est pas encapsulé par IP
        """ Retourne les informations de la couche IP """
        try:
            return paquet[IP]
        except:
            return 99


Ajoutons maintenant une fonction nous retournant l'eventuelle chaine de caractère contenue dans le paquet

def returnRaw(self,paquet):
        try:
            return paquet[Raw]
        except:
            return 99

De la même manière, il nous faut des fonctions retournant les differents protocoles utilisés. On va en créer une pour chacun des protocoles suivants : ICMP, ARP, TCP,UDP

def isTCP(self,paquet):
        try:
            return paquet[TCP]
        except:
            return 99
    def isUDP(self,paquet):
        try:
            return paquet[UDP]
        except:
            return 99
    def isARP(self,paquet):
        try:
            return paquet[ARP]
        except:    
            return 99
    def isICMP(self,paquet):
        try:
            return paquet[ICMP]
        except:
            return 99

Jusque là, ça devrait aller. Ce sont des fonctions super-simple. Je ne m'attarde pas dessus. Créons maintenant une fonction retournant en toute lettre le type de message ARP (pour faciliter la lecture lors de l'utilisation du sniffer). En effet, l'attribut paquet[ARP].op retourne un nombre et pas une chaine de carractère. C'est pas trop un probleme:

def getARPop(self,paquet):
        t = paquet[ARP].op
        if t == 1:
            return 'Request (Who-has)'
        elif t == 2:
            return 'Reply'

On procède de même pour les paquets ICMP. C'est un peu long, mais c'est facile. Merci a wikipedia pour les informations au sujet des types et des codes ICMP

def getICMPType(self,paquet):
        t = paquet[ICMP].type
        u = paquet[ICMP].code
        if t == 0:
            return 'echo-reply'
        elif t == 3:
            if u == 0:
                return 'Reseau Inaccessible'
            elif u == 1:
                return 'Machine inaccessible'
            elif u == 2:
                return 'Protocole inaccessible'
            elif u == 3:
                return 'Port inacessible'
            elif u == 4:
                return 'Fragmentation necessaire mais impossible'
            elif u == 5:
                return 'Echec du routage'
            elif u == 6:
                return 'Reseau inconnu'
            elif u == 7:
                return 'Machine inconnue'
            elif u == 8:
                return 'Machine non connectée au reseau'
            elif u == 9:
                return 'Comunication avec le reseau interdite'
            elif u == 10:
                return 'Communcation avec la machine interdite'
            elif u == 11:
                return 'Reseau inaccessible pour ce service'
            elif u == 12:
                return 'Machine inacessible pour ce service'
            elif u == 13:
                return 'Communication interdite [Filtre]'
            elif u == 14:
                return 'Priorité d\'hôte violé'
            elif u == 15:
                return 'Limite de priorité atteinte'
        elif t == 4:
            return 'Extinction de la source'
        elif t == 5:
            return 'Redirection'
        elif t == 8:
            return 'Echo-request'
        elif t == 11:
            return 'Timeout'
        elif t == 12:
            return 'Entête erronée'
        elif t == 13:
            return 'Demande d\'heure'
        elif t ==  14:
            return 'Reponse heure'
        elif t == 15:
            return 'Demande IP'
        elif t == 16:
            return 'Reponse IP'
        elif t == 17:
            return 'Demande masque de sous réseau'
        elif t == 18:
            return 'Reponse masque de sous réseau'

Cette methode retourne par exemple 'Redirection' si le code du paquet ICMP est 4. Je l'ai écrite uniquement pour faciliter la lecture.
Passons maintenant à la gestion des filtres. La syntaxe que je me propose pour l'execution du script serait de la forme

!/sniffer.py <filtre_protocolaire> <filtre_ip>
Etudions la fonction gérant le filtre protcolaire. On va creer quatre attributs pouvant prendre la valeur 0 ou 1.  Les attributs seront TCP ARP ICMP UDP. Seuls les protocoles ayant pour valeur 1 seront affichés. Voici la méthode gérant les protocoles.

def setProto(self,filtre):  #Argument demandant le protocole
        if filtre != '-a':
            self.TCP = 0
            self.UDP = 0
            self.ICMP = 0
            self.ARP = 0
            if 'tcp' in filtre:
                self.TCP = 1
            if 'udp' in filtre:
                self.UDP = 1
            if 'icmp' in filtre:
                self.ICMP = 1
            if 'arp' in filtre:
                self.ARP = 1
        else:
            self.TCP = 1
            self.UDP = 1
            self.ICMP = 1
            self.ARP = 1


Ainsi, le flags -a signifie 'All' et positionne tous les attributs à 1. Sinon, il suffit d'indiquer les protocoles qu'on veut afficher. Voici maintenant la méthode __init__. Elle apelle la fonction run qui constituera le corps de notre sniffer. Je vous la décrirais dans un instant.

def __init__(self,filtre,host):
        self.TCP = 0
        self.UDP = 0
        self.ICMP = 0
        self.ARP = 0
        self.host = host
        self.setProto(filtre)
        self.run()


On initialise les attributs puis on les positionne en fonction du filtre choisi par l'utilisateur grace à la fonction qu'on vient de créer. Puis on invoque la fonction run. Cette fonction constitue une boucle evenementielle. Elle se subdivise en deux étapes.
1) On récupere un paquet
2) On verifie si il correspond aux filtres
3) On l'affiche avec les informations de son entête.

Voici la premiere partie du code. Je pense l'avoir suffisament commenté.

def run(self):
        while 1:
            infos = [] #On crée une variable info qui contiendra ip source/dst
            a = sniff(count = 1)[0] #On récupere un paquet
            isIP = self.returnIP(a) #On test si il est encapsulé IP
        #---------------------------------------------
            #Récupération ipSource / ipDst
        #---------------------------------------------
            if isIP != 99: #Si c'est le cas, on récpere les informations
                infos.append(a[IP].src)
                infos.append(a[IP].dst)
            else: #Sinon, on l'indique
                infos.append(0)
                infos.append(0)



Appliquons maintenant le filtre IP. Nous ne voulons que les paquets en provenance ou a destination de l'ip indiquée. Voici donc la condition:

if infos[0] == self.host or infos[1] == self.host or self.host == '-a':
       
raw = self.returnRaw(a)
Si le filtre est -a, on accepte toutes les IP. Sinon, il faut qu'au moins l'une des IP parmis la source et la destination corresponde à celle du filtre. SI c'est le cas, on traite le paquet. La suite du code consiste à l'etablissement des protocoles. On teste d'abord si il s'agit d'un paquet encapsulé via TCP:

if self.isTCP(a) != 99 and self.TCP == 1:
En effet, on traite le paquet comme un paquet TCP si et seulement si c'est bien un paquet TCP, et si le filtre TCP est activé. Puis on affiche les informations suivante, si c'est le cas
ip_source : port_source  -> ip_dst : port_dst / flags / numero_sequence, numéro_ack/ raw (si il y a une raw)
Ainsi, cela donne :

if self.isTCP(a) != 99 and self.TCP == 1:
                    msg = '[\033[31mTCP\033[00m]\033[34m '
                    msg += infos[0] + '\033[00m:\033[35m' + str(a[TCP].sport) + '\033[00m -> \033[34m' + infos[1] + '\033[00m:\033[35m' + str(a[TCP].dport) + '\033[00m/ \033[31mflags\033[00m : \033[35m' + str(a[TCP].flags) + '\033[00m / \033[31mseq\033[00m = \033[35m' + str(a[TCP].seq) + '\033[00m\033[31m ack \033[00m= \033[35m' + str(a[TCP].ack) + '\033[00m'
                    if raw != 99:
                        msg += '/\033[31m Raw \033[00m:\033[01m ' + raw.load + '\033[00m'
                        print msg


Ne soyez pas effrayés ! Les carractères \033[XXm correspondent à un code de couleurs (tant qu'a faire autant que ce soit beau). Ce code barbare affichera les informations sous la forme indiquée si dessus. Notez qu'on affiche l'onglet "Raw" uniquement si la variable raw est differente de 99, autrement dit, uniquement si il y a une Raw dans le message.


On procède de même pour les autres protocoles. Voici les codes



 #si c'est UDP
                elif self.isUDP(a) != 99 and self.UDP == 1:
                    msg = '[\033[31mUDP\033[00m]\033[34m '
                    msg += infos[0] + '\033[00m:\033[35m' + str(a[UDP].sport) + '\033[00m -> \033[34m' + infos[1] + '\033[00m:\033[35m' + str(a[UDP].dport)
                    if raw != 99:
                        msg += '/\033[31m Raw \033[00m:\033[01m ' + raw.load + '\033[00m'
                        print msg
            #Si c'est ICMP
                elif self.isICMP(a) != 99 and self.ICMP == 1:
                    msg = '[\033[31mICMP\033[00m]\033[32m '
                    msg += infos[0] + ' -> ' + infos[1] + '/'
                    msg += 'type = ' + self.getICMPType(a) + '\033[00m'
                    print msg
            #Si c'est ARP
                elif self.isARP(a) != 99 and self.ARP == 1:
                    msg = '[\033[31mARP\033[00m] '
                    msg += '\033[34m' + a[ARP].psrc + '\033[00m (\033[35m' + a[Ether].src + '\033[00m) -> \033[34m' + a[ARP].pdst + '\033[00m (\033[35m' + a[Ether].dst + '\033[00m) / Op = \033[01m' + self.getARPop(a) + '\033[00m'
                    print msg

Remarquez que j'apelle les fonctions getARPop et getICMPType pour les protocoles ICMP et ARP (les fonctions me retournant le nom de l'action effectuée par le protocole, et non pas le numéro).

Et bien, c'est fini pour ce gros dossier ! C'etait pas facile je vous l'accorde, mais avouez que c'est assez stylé ! Voici le code complet, si vous avez des questions, n'hesitez pas.


#-*-coding:Utf-8-*-
#---------------------------------------------
#       iPsilon Trame Sniffer
# by iZy_TeH_PariaH
# version 1.0
# python 2.6, Ubuntu
#---------------------------------------------
# For CyberPeace and Harmony ;-)
#---------------------------------------------
from scapy.all import *

class snifferBot:
    def __init__(self,filtre,host):
        self.TCP = 0
        self.UDP = 0
        self.ICMP = 0
        self.ARP = 0
        self.host = host
        self.setProto(filtre)
        self.run()
    def setProto(self,filtre):  #Argument demandant le protocole
        if filtre != '-a':
            self.TCP = 0
            self.UDP = 0
            self.ICMP = 0
            self.ARP = 0
            if 'tcp' in filtre:
                self.TCP = 1
            if 'udp' in filtre:
                self.UDP = 1
            if 'icmp' in filtre:
                self.ICMP = 1
            if 'arp' in filtre:
                self.ARP = 1
        else:
            self.TCP = 1
            self.UDP = 1
            self.ICMP = 1
            self.ARP = 1
        
    def run(self):
        while 1:
            infos = []
            a = sniff(count = 1)[0]
            isIP = self.returnIP(a)
        #---------------------------------------------
            #Récupération ipSource / ipDst
        #---------------------------------------------
            if isIP != 99:
                infos.append(a[IP].src)
                infos.append(a[IP].dst)
            else:
                infos.append(0)
                infos.append(0)
            if infos[0] == self.host or infos[1] == self.host or self.host == '-a':
        #---------------------------------------------
            #Récupération Raw
        #---------------------------------------------
                raw = self.returnRaw(a)
                #si c'est TCP
                if self.isTCP(a) != 99 and self.TCP == 1:
                    msg = '[\033[31mTCP\033[00m]\033[34m '
                    msg += infos[0] + '\033[00m:\033[35m' + str(a[TCP].sport) + '\033[00m -> \033[34m' + infos[1] + '\033[00m:\033[35m' + str(a[TCP].dport) + '\033[00m/ \033[31mflags\033[00m : \033[35m' + str(a[TCP].flags) + '\033[00m / \033[31mseq\033[00m = \033[35m' + str(a[TCP].seq) + '\033[00m\033[31m ack \033[00m= \033[35m' + str(a[TCP].ack) + '\033[00m'
                    if raw != 99:
                        msg += '/\033[31m Raw \033[00m:\033[01m ' + raw.load + '\033[00m'
                        print msg
            #si c'est UDP
                elif self.isUDP(a) != 99 and self.UDP == 1:
                    msg = '[\033[31mUDP\033[00m]\033[34m '
                    msg += infos[0] + '\033[00m:\033[35m' + str(a[UDP].sport) + '\033[00m -> \033[34m' + infos[1] + '\033[00m:\033[35m' + str(a[UDP].dport)
                    if raw != 99:
                        msg += '/\033[31m Raw \033[00m:\033[01m ' + raw.load + '\033[00m'
                        print msg
            #Si c'est ICMP
                elif self.isICMP(a) != 99 and self.ICMP == 1:
                    msg = '[\033[31mICMP\033[00m]\033[32m '
                    msg += infos[0] + ' -> ' + infos[1] + '/'
                    msg += 'type = ' + self.getICMPType(a) + '\033[00m'
                    print msg
            #Si c'est ARP
                elif self.isARP(a) != 99 and self.ARP == 1:
                    msg = '[\033[31mARP\033[00m] '
                    msg += '\033[34m' + a[ARP].psrc + '\033[00m (\033[35m' + a[Ether].src + '\033[00m) -> \033[34m' + a[ARP].pdst + '\033[00m (\033[35m' + a[Ether].dst + '\033[00m) / Op = \033[01m' + self.getARPop(a) + '\033[00m'
                    print msg
    
    def isTCP(self,paquet):
        try:
            return paquet[TCP]
        except:
            return 99
    def isUDP(self,paquet):
        try:
            return paquet[UDP]
        except:
            return 99
    def isARP(self,paquet):
        try:
            return paquet[ARP]
        except:    
            return 99
    def isICMP(self,paquet):
        try:
            return paquet[ICMP]
        except:
            return 99
    
    def returnRaw(self,paquet):
        try:
            return paquet[Raw]
        except:
            return 99
    
    def returnIP(self,paquet):
        try:
            return paquet[IP]
        except:
            return 99
    def getARPop(self,paquet):
        t = paquet[ARP].op
        if t == 1:
            return 'Request (Who-has)'
        elif t == 2:
            return 'Reply'
    def getICMPType(self,paquet):
        t = paquet[ICMP].type
        u = paquet[ICMP].code
        if t == 0:
            return 'echo-reply'
        elif t == 3:
            if u == 0:
                return 'Reseau Inaccessible'
            elif u == 1:
                return 'Machine inaccessible'
            elif u == 2:
                return 'Protocole inaccessible'
            elif u == 3:
                return 'Port inacessible'
            elif u == 4:
                return 'Fragmentation necessaire mais impossible'
            elif u == 5:
                return 'Echec du routage'
            elif u == 6:
                return 'Reseau inconnu'
            elif u == 7:
                return 'Machine inconnue'
            elif u == 8:
                return 'Machine non connectée au reseau'
            elif u == 9:
                return 'Comunication avec le reseau interdite'
            elif u == 10:
                return 'Communcation avec la machine interdite'
            elif u == 11:
                return 'Reseau inaccessible pour ce service'
            elif u == 12:
                return 'Machine inacessible pour ce service'
            elif u == 13:
                return 'Communication interdite [Filtre]'
            elif u == 14:
                return 'Priorité d\'hôte violé'
            elif u == 15:
                return 'Limite de priorité atteinte'
        elif t == 4:
            return 'Extinction de la source'
        elif t == 5:
            return 'Redirection'
        elif t == 8:
            return 'Echo-request'
        elif t == 11:
            return 'Timeout'
        elif t == 12:
            return 'Entête erronée'
        elif t == 13:
            return 'Demande d\'heure'
        elif t ==  14:
            return 'Reponse heure'
        elif t == 15:
            return 'Demande IP'
        elif t == 16:
            return 'Reponse IP'
        elif t == 17:
            return 'Demande masque de sous réseau'
        elif t == 18:
            return 'Reponse masque de sous réseau'
if __name__ == '__main__':
    if len(sys.argv) < 3:
        print 'Syntaxe : !/.py -filtre -host'
        print '-a = all'
        print 'filtres : tcp udp icmp arp'
        exit(0)
    filtre = sys.argv[1]
    host = sys.argv[2]
    a = snifferBot(filtre,host)


Et pour finir une petite capture d'écran parce que bon, c'est quand même assez beau !


Publié dans Python

Pour être informé des derniers articles, inscrivez vous :
Commenter cet article