# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, IPv6, TCP
+
+from scapy.all import Ether
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from ipaddress import ip_address
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
- raise RuntimeError(
- 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv4 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IP'].tos >> 2
+ rx_dscp = pkt_recv[IP].tos >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
def check_ipv6(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
- raise RuntimeError(
- 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv6 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IPv6'].tc >> 2
+ rx_dscp = pkt_recv[IPv6].tc >> 2
if rx_dscp != dscp:
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
# pylint: disable=too-many-locals
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ TCP())
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ TCP())
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- ip_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ ip_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
+ if pkt_recv is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt_recv is None:
raise RuntimeError('Rx timeout')
sys.exit(0)
+
if __name__ == "__main__":
main()