import sys
import ipaddress
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
- args = TrafficScriptArg(
- ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
+ args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
+ 'source_port', 'destination_port'])
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
- ip_version = None
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
elif protocol.upper() == 'UDP':
protocol = UDP
else:
- raise ValueError("Invalid type of protocol!")
+ raise ValueError("Invalid protocol type!")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("TCP/UDP Rx timeout")
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('TCP/UDP Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if 'TCP' in ether:
+ if TCP in ether:
print ("TCP packet received.")
- elif 'UDP' in ether:
+ elif UDP in ether:
print ("UDP packet received.")
else:
- raise RuntimeError("Not an TCP or UDP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an TCP or UDP packet received {0}".
+ format(ether.__repr__()))
sys.exit(0)