We need to adapt all functional traffic scripts related to functional
IPv6 tests to ingore receiving of unexpected ICMPv6ND_NS
(ICMPv6 Neighbor Discovery - Neighbor Solicitation) packets that are
sent automatically and we cannot avoid to receive them.
The reason is to prevent false negative test results in case of csit
functional tests that could block creation of new operational branch
(csit weekly jobs), usage of new vpp builds (csit semiweekly jobs)
and merging patches - csit as well as vpp.
Change-Id: I43c90e7c766762fa769a81661338759a11b401a1
Signed-off-by: Jan Gelety <jgelety@cisco.com>
22 files changed:
import sys
import ipaddress
import sys
import ipaddress
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS
+
from resources.libraries.python.PacketVerifier import RxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
interval = int(args.get_arg('interval'))
rxq = RxQueue(rx_if)
interval = int(args.get_arg('interval'))
rxq = RxQueue(rx_if)
- ether = rxq.recv(max(5, interval))
+ # receive ICMPv6ND_RA packet
+ while True:
+ ether = rxq.recv(max(5, interval))
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ # Check if received packet contains layer RA and check other values
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
if src_address != link_local:
all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
if src_address != link_local:
- raise RuntimeError(
- 'Source address ({0}) not matching link local address({1})'.format(
- src_address, link_local))
+ raise RuntimeError('Source address ({0}) not matching link local '
+ 'address ({1})'.format(src_address, link_local))
if dst_address != all_nodes_multicast:
if dst_address != all_nodes_multicast:
- raise RuntimeError('Packet destination address ({0}) is not the all '
- 'nodes multicast address ({1}).'.format(
- dst_address, all_nodes_multicast))
- if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
-
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ raise RuntimeError('Packet destination address ({0}) is not the all'
+ ' nodes multicast address ({1}).'.
+ format(dst_address, all_nodes_multicast))
+ if ether[IPv6].hlim != 255:
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether[IPv6].hlim))
+
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import logging
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# send ICMPv6 echo request
pkt_send = (Ether(src=src_mac, dst=dst_mac) /
sent_packets.append(pkt_send)
txq.send(pkt_send)
# send ICMPv6 echo request
pkt_send = (Ether(src=src_mac, dst=dst_mac) /
- IPv6(src=src_ip, dst=dst_ip) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 echo reply
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 echo reply
- ether = rxq.recv(2, sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
if not ipv6.haslayer(ICMPv6EchoReply):
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'received {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} seq {1} '
+ 'should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import logging
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
+
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
def main():
args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
def main():
args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
# send ICMPv6 neighbor advertisement message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=src_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=src_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=src_mac))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NA(tgt=dst_ip, R=0) /
- ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
+ IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NA(tgt=dst_ip, R=0) /
+ ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# send ICMPv6 echo request from first TG interface
pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# send ICMPv6 echo request from first TG interface
pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
- IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
- ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
+ IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
+ ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
# receive ICMPv6 echo request on second TG interface
src_sent_packets.append(pkt_send)
src_txq.send(pkt_send)
# receive ICMPv6 echo request on second TG interface
- ether = dst_rxq.recv(2, dst_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = dst_rxq.recv(2, dst_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
+ format(ether.__repr__()))
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim,hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoRequest):
if not ipv6.haslayer(ICMPv6EchoRequest):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Request']
+ icmpv6 = ipv6[ICMPv6EchoRequest]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoRequest(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoRequest(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
# send ICMPv6 echo reply from second TG interface
pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
# send ICMPv6 echo reply from second TG interface
pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
- IPv6(src=dst_ip, dst=src_ip) /
- ICMPv6EchoReply(id=echo_id, seq=echo_seq))
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply(id=echo_id, seq=echo_seq))
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# receive ICMPv6 echo reply on first TG interface
dst_sent_packets.append(pkt_send)
dst_txq.send(pkt_send)
# receive ICMPv6 echo reply on first TG interface
- ether = src_rxq.recv(2, src_sent_packets)
- if ether is None:
- raise RuntimeError('ICMPv6 echo reply Rx timeout')
+ while True:
+ ether = src_rxq.recv(2, src_sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
+ format(ether.__repr__()))
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
# verify hop limit processing
if ipv6.hlim != (hop_limit - hop_num):
- raise RuntimeError(
- 'Invalid hop limit {0} should be {1}'.format(ipv6.hlim,
- hop_limit - hop_num))
+ raise RuntimeError('Invalid hop limit {0} should be {1}'.
+ format(ipv6.hlim, hop_limit - hop_num))
if not ipv6.haslayer(ICMPv6EchoReply):
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
+ format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
# check identifier and sequence number
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be ' \
- 'ID {2} seq {3}'.format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
+ raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
+ 'seq {1} should be ID {2} seq {3}'.
+ format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
# verify checksum
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler, \
- IPFIXData
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXHandler
+from resources.libraries.python.telemetry.IPFIXUtil import IPFIXData
pkt = rxq.recv(10, ignore=ignore, verbose=verbose)
if pkt is None:
raise RuntimeError("RX timeout")
pkt = rxq.recv(10, ignore=ignore, verbose=verbose)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
# verify packet count
if data["packetTotalCount"] != count:
# verify packet count
if data["packetTotalCount"] != count:
- raise RuntimeError(
- "IPFIX reported wrong packet count. Count was {0},"
- " but should be {1}".format(data["packetTotalCount"], count))
+ raise RuntimeError("IPFIX reported wrong packet count. Count was {0},"
+ "but should be {1}".format(data["packetTotalCount"],
+ count))
# verify IP addresses
keys = data.keys()
err = "{0} mismatch. Packets used {1}, but were classified as {2}."
# verify IP addresses
keys = data.keys()
err = "{0} mismatch. Packets used {1}, but were classified as {2}."
pkt = rxq.recv(5)
if pkt is None:
raise RuntimeError("RX timeout")
pkt = rxq.recv(5)
if pkt is None:
raise RuntimeError("RX timeout")
+
+ if pkt.haslayer("ICMPv6ND_NS"):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
if pkt.haslayer("IPFIXHeader"):
if pkt.haslayer("IPFIXTemplate"):
# create or update template for IPFIX data packets
raise RuntimeError("Received non-IPFIX packet or IPFIX header was"
"not recognized.")
raise RuntimeError("Received non-IPFIX packet or IPFIX header was"
"not recognized.")
-if __name__ == "__main__":
+if __name__ == "__main__":
import sys
import logging
import sys
import logging
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, ICMP, IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+
+from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.layers.ipsec import SecurityAssociation, ESP
from ipaddress import ip_address
from scapy.layers.ipsec import SecurityAssociation, ESP
from ipaddress import ip_address
:type dst_tun: str
:type src_ip: str
:type dst_ip: str
:type dst_tun: str
:type src_ip: str
:type dst_ip: str
- :type sa_sa: scapy.layers.ipsec.SecurityAssociation
+ :type sa_in: scapy.layers.ipsec.SecurityAssociation
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IP']
d_pkt = sa_in.decrypt(ip_pkt)
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IP'].dst != dst_ip:
+ if d_pkt[IP].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address: {0} '
'should be: {1}'.format(d_pkt['IP'].dst, dst_ip))
raise RuntimeError(
'Decrypted packet has invalid destination address: {0} '
'should be: {1}'.format(d_pkt['IP'].dst, dst_ip))
- if d_pkt['IP'].src != src_ip:
+ if d_pkt[IP].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IP'].src, src_ip))
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IP'].src, src_ip))
raise RuntimeError(
'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
raise RuntimeError(
'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
- if pkt_recv['IPv6'].dst != dst_tun:
+ if pkt_recv[IPv6].dst != dst_tun:
raise RuntimeError(
'Received packet has invalid destination address: {0} '
'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun))
raise RuntimeError(
'Received packet has invalid destination address: {0} '
'should be: {1}'.format(pkt_recv['IPv6'].dst, dst_tun))
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
raise RuntimeError(
'Not an ESP packet received: {0}'.format(pkt_recv.__repr__()))
- ip_pkt = pkt_recv['IPv6']
+ ip_pkt = pkt_recv[IPv6]
d_pkt = sa_in.decrypt(ip_pkt)
d_pkt = sa_in.decrypt(ip_pkt)
- if d_pkt['IPv6'].dst != dst_ip:
+ if d_pkt[IPv6].dst != dst_ip:
raise RuntimeError(
'Decrypted packet has invalid destination address {0}: '
'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip))
raise RuntimeError(
'Decrypted packet has invalid destination address {0}: '
'should be: {1}'.format(d_pkt['IPv6'].dst, dst_ip))
- if d_pkt['IPv6'].src != src_ip:
+ if d_pkt[IPv6].src != src_ip:
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IPv6'].src, src_ip))
raise RuntimeError(
'Decrypted packet has invalid source address: {0} should be: {1}'
.format(d_pkt['IPv6'].src, src_ip))
sent_packets = []
if is_ipv4:
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- ICMP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ ICMP())
ip_pkt = IP(str(ip_pkt))
else:
ip_pkt = IP(str(ip_pkt))
else:
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
ip_pkt = IPv6(str(ip_pkt))
e_pkt = sa_out.encrypt(ip_pkt)
ip_pkt = IPv6(str(ip_pkt))
e_pkt = sa_out.encrypt(ip_pkt)
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- e_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ e_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
- if pkt_recv is None:
- raise RuntimeError('ESP packet Rx timeout')
+ if pkt_recv is None:
+ raise RuntimeError('ESP packet Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if is_ipv4:
check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in)
if is_ipv4:
check_ipv4(pkt_recv, src_tun, dst_ip, src_ip, sa_in)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
"""Traffic script that sends DHCPv6 proxy packets."""
from scapy.layers.inet import Ether
"""Traffic script that sends DHCPv6 proxy packets."""
from scapy.layers.inet import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
- ICMPv6ND_NS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
- icmpv6nd_solicit_pkt =\
- Ether(src=src_mac, dst=dst_mac) / \
- IPv6(src=src_ip) / \
- ICMPv6ND_NS(tgt=dst_ip)
+ icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
+ IPv6(src=src_ip) /
+ ICMPv6ND_NS(tgt=dst_ip))
sent_packets.append(icmpv6nd_solicit_pkt)
txq.send(icmpv6nd_solicit_pkt)
ether = None
for _ in range(5):
sent_packets.append(icmpv6nd_solicit_pkt)
txq.send(icmpv6nd_solicit_pkt)
ether = None
for _ in range(5):
- pkt = rxq.recv(3, ignore=sent_packets)
+ while True:
+ pkt = rxq.recv(3, ignore=sent_packets)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
if pkt is not None:
ether = pkt
break
raise RuntimeError('ICMPv6ND Proxy response timeout.')
if ether.src != dst_mac:
raise RuntimeError('ICMPv6ND Proxy response timeout.')
if ether.src != dst_mac:
- raise RuntimeError("Source MAC address error: {} != {}".format(
- ether.src, dst_mac))
+ raise RuntimeError("Source MAC address error: {} != {}".
+ format(ether.src, dst_mac))
print "Source MAC address: OK."
if ether.dst != src_mac:
print "Source MAC address: OK."
if ether.dst != src_mac:
- raise RuntimeError("Destination MAC address error: {} != {}".format(
- ether.dst, src_mac))
+ raise RuntimeError("Destination MAC address error: {} != {}".
+ format(ether.dst, src_mac))
print "Destination MAC address: OK."
print "Destination MAC address: OK."
- if ether['IPv6'].src != dst_ip:
- raise RuntimeError("Source IP address error: {} != {}".format(
- ether['IPv6'].src, dst_ip))
+ if ether[IPv6].src != dst_ip:
+ raise RuntimeError("Source IP address error: {} != {}".
+ format(ether[IPv6].src, dst_ip))
print "Source IP address: OK."
print "Source IP address: OK."
- if ether['IPv6'].dst != src_ip:
- raise RuntimeError("Destination IP address error: {} != {}".format(
- ether['IPv6'].dst, src_ip))
+ if ether[IPv6].dst != src_ip:
+ raise RuntimeError("Destination IP address error: {} != {}".
+ format(ether[IPv6].dst, src_ip))
print "Destination IP address: OK."
try:
print "Destination IP address: OK."
try:
- target_addr = ether['IPv6']\
- ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
+ target_addr = ether[IPv6][ICMPv6ND_NA].tgt
except (KeyError, AttributeError):
raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
if target_addr != dst_ip:
except (KeyError, AttributeError):
raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
if target_addr != dst_ip:
- raise RuntimeError("ICMPv6 field 'Target address' error:"
- " {} != {}".format(target_addr, dst_ip))
+ raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
+ format(target_addr, dst_ip))
print "Target address field: OK."
print "Target address field: OK."
rxq = RxQueue(dst_if)
txq = TxQueue(src_if)
rxq = RxQueue(dst_if)
txq = TxQueue(src_if)
- icmpv6_ping_pkt = \
- Ether(src=src_mac, dst=proxy_to_src_mac) / \
- IPv6(src=src_ip, dst=dst_ip) / \
- ICMPv6EchoRequest()
+ icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
+ IPv6(src=src_ip, dst=dst_ip) /
+ ICMPv6EchoRequest())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
if pkt is not None:
ether = pkt
break
if ether is None:
raise RuntimeError('ICMPv6 Echo Request timeout.')
try:
if ether is None:
raise RuntimeError('ICMPv6 Echo Request timeout.')
try:
- ether["IPv6"]["ICMPv6 Echo Request"]
+ ether[IPv6]["ICMPv6 Echo Request"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
print "ICMP Echo: OK."
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
print "ICMP Echo: OK."
rxq = RxQueue(src_if)
txq = TxQueue(dst_if)
rxq = RxQueue(src_if)
txq = TxQueue(dst_if)
- icmpv6_ping_pkt = \
- Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
- IPv6(src=dst_ip, dst=src_ip) / \
- ICMPv6EchoReply()
+ icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
+ IPv6(src=dst_ip, dst=src_ip) /
+ ICMPv6EchoReply())
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
txq.send(icmpv6_ping_pkt)
ether = None
for _ in range(5):
+ while True:
+ pkt = rxq.recv(3)
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt is not None:
ether = pkt
break
if pkt is not None:
ether = pkt
break
if ether is None:
raise RuntimeError('DHCPv6 SOLICIT timeout')
try:
if ether is None:
raise RuntimeError('DHCPv6 SOLICIT timeout')
try:
- ether["IPv6"]["ICMPv6 Echo Reply"]
+ ether[IPv6]["ICMPv6 Echo Reply"]
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
except KeyError:
raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
# Verify route (ICMP echo/reply)
imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
# Verify route (ICMP echo/reply)
- ipv6_ping(src_if, dst_if, src_mac, dst_mac,
- proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
+ ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
+ proxy_to_dst_mac, src_ip, dst_ip)
+
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import logging
import sys
import logging
+
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue,\
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
+
+from scapy.all import Ether
from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
-from scapy.all import Ether
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
# send ICMPv6 neighbor solicitation message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
# send ICMPv6 neighbor solicitation message
pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
- IPv6(src=src_ip, dst='ff02::1:ff00:2') /
- ICMPv6ND_NS(tgt=dst_ip) /
- ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
+ IPv6(src=src_ip, dst='ff02::1:ff00:2') /
+ ICMPv6ND_NS(tgt=dst_ip) /
+ ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 neighbor advertisement message
sent_packets.append(pkt_send)
txq.send(pkt_send)
# receive ICMPv6 neighbor advertisement message
- ether = rxq.recv(2, sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
if ether is None:
raise RuntimeError('ICMPv6 echo reply Rx timeout')
if not ether.haslayer(IPv6):
if ether is None:
raise RuntimeError('ICMPv6 echo reply Rx timeout')
if not ether.haslayer(IPv6):
- raise RuntimeError('Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
+ format(ether.__repr__()))
if not ipv6.haslayer(ICMPv6ND_NA):
if not ipv6.haslayer(ICMPv6ND_NA):
- raise RuntimeError(
- 'Unexpected packet with no ICMPv6 ND-NA received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
+ '{0}'.format(ipv6.__repr__()))
- icmpv6_na = ipv6['ICMPv6 Neighbor Discovery - Neighbor Advertisement']
+ icmpv6_na = ipv6[ICMPv6ND_NA]
# verify target address
if icmpv6_na.tgt != dst_ip:
# verify target address
if icmpv6_na.tgt != dst_ip:
- raise RuntimeError('Invalid target address {0} should be {1}'.format(
- icmpv6_na.tgt, dst_ip))
+ raise RuntimeError('Invalid target address {0} should be {1}'.
+ format(icmpv6_na.tgt, dst_ip))
if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
- raise RuntimeError(
- 'Missing Destination Link-Layer Address option in ICMPv6 ' +
- 'Neighbor Advertisement {0}'.format(icmpv6_na.__repr__()))
+ raise RuntimeError('Missing Destination Link-Layer Address option in '
+ 'ICMPv6 Neighbor Advertisement {0}'.
+ format(icmpv6_na.__repr__()))
- option = 'ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address'
- dst_ll_addr = icmpv6_na[option]
+ dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
# verify destination link-layer address field
if dst_ll_addr.lladdr != dst_mac:
# verify destination link-layer address field
if dst_ll_addr.lladdr != dst_mac:
- raise RuntimeError('Invalid lladdr {0} should be {1}'.format(
- dst_ll_addr.lladdr, dst_mac))
+ raise RuntimeError('Invalid lladdr {0} should be {1}'.
+ format(dst_ll_addr.lladdr, dst_mac))
# verify checksum
cksum = icmpv6_na.cksum
del icmpv6_na.cksum
tmp = ICMPv6ND_NA(str(icmpv6_na))
if not checksum_equal(tmp.cksum, cksum):
# verify checksum
cksum = icmpv6_na.cksum
del icmpv6_na.cksum
tmp = ICMPv6ND_NA(str(icmpv6_na))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum {0} should be {1}'.
+ format(cksum, tmp.cksum))
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
+# pylint: disable=no-name-in-module
+# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, \
- checksum_equal
-from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
-from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
-from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
from scapy.all import Ether
from scapy.all import Ether
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
+from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
+from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+
+from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
+from resources.libraries.python.PacketVerifier import checksum_equal
+from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
sent_packets.append(pkt_send)
txq.send(pkt_send)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- ether = rxq.recv(ignore=sent_packets)
- if ether is None:
- raise RuntimeError(
- 'ICMPv6 echo reply seq {0} Rx timeout'.format(echo_seq))
+ while True:
+ ether = rxq.recv(ignore=sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply seq {0} '
+ 'Rx timeout'.format(echo_seq))
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if not ether.haslayer(IPv6):
if not ether.haslayer(IPv6):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 received {0}'.format(
- ether.__repr__()))
+ raise RuntimeError('Unexpected packet with no IPv6 layer '
+ 'received: {0}'.format(ether.__repr__()))
if not ipv6.haslayer(ICMPv6EchoReply):
if not ipv6.haslayer(ICMPv6EchoReply):
- raise RuntimeError(
- 'Unexpected packet with no IPv6 ICMP received {0}'.format(
- ipv6.__repr__()))
+ raise RuntimeError('Unexpected packet with no ICMPv6 echo reply '
+ 'layer received: {0}'.format(ipv6.__repr__()))
- icmpv6 = ipv6['ICMPv6 Echo Reply']
+ icmpv6 = ipv6[ICMPv6EchoReply]
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
- raise RuntimeError(
- 'Invalid ICMPv6 echo reply received ID {0} seq {1} should be '
- 'ID {2} seq {3}, {0}'.format(icmpv6.id, icmpv6.seq, echo_id,
- echo_seq))
+ raise RuntimeError('ICMPv6 echo reply with invalid data received: '
+ 'ID {0} seq {1} should be ID {2} seq {3}, {0}'.
+ format(icmpv6.id, icmpv6.seq, echo_id,
+ echo_seq))
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
cksum = icmpv6.cksum
del icmpv6.cksum
tmp = ICMPv6EchoReply(str(icmpv6))
if not checksum_equal(tmp.cksum, cksum):
- raise RuntimeError(
- 'Invalid checksum {0} should be {1}'.format(cksum, tmp.cksum))
+ raise RuntimeError('Invalid checksum: {0} should be {1}'.
+ format(cksum, tmp.cksum))
sent_packets.remove(pkt_send)
sent_packets.remove(pkt_send)
# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# pylint: disable=no-name-in-module
# pylint: disable=import-error
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
-from scapy.all import Ether, IP, IPv6, TCP
+
+from scapy.all import Ether
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from ipaddress import ip_address
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from ipaddress import ip_address
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IP):
- raise RuntimeError(
- 'Not an IPv4 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv4 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IP'].tos >> 2
+ rx_dscp = pkt_recv[IP].tos >> 2
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
def check_ipv6(pkt_recv, dscp):
def check_ipv6(pkt_recv, dscp):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
:raises RuntimeError: If received packet is invalid.
"""
if not pkt_recv.haslayer(IPv6):
- raise RuntimeError(
- 'Not an IPv6 packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not an IPv6 packet received: {0}'.
+ format(pkt_recv.__repr__()))
- rx_dscp = pkt_recv['IPv6'].tc >> 2
+ rx_dscp = pkt_recv[IPv6].tc >> 2
- raise RuntimeError(
- 'Invalid DSCP {0} should be {1}'.format(rx_dscp, dscp))
+ raise RuntimeError('Invalid DSCP {0} should be {1}'.
+ format(rx_dscp, dscp))
if not pkt_recv.haslayer(TCP):
if not pkt_recv.haslayer(TCP):
- raise RuntimeError(
- 'Not a TCP packet received: {0}'.format(pkt_recv.__repr__()))
+ raise RuntimeError('Not a TCP packet received: {0}'.
+ format(pkt_recv.__repr__()))
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals
sent_packets = []
if is_ipv4:
sent_packets = []
if is_ipv4:
- ip_pkt = IP(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IP(src=src_ip, dst=dst_ip) /
+ TCP())
- ip_pkt = IPv6(src=src_ip, dst=dst_ip) / \
- TCP()
+ ip_pkt = (IPv6(src=src_ip, dst=dst_ip) /
+ TCP())
- pkt_send = Ether(src=src_mac, dst=dst_mac) / \
- ip_pkt
+ pkt_send = (Ether(src=src_mac, dst=dst_mac) /
+ ip_pkt)
sent_packets.append(pkt_send)
txq.send(pkt_send)
sent_packets.append(pkt_send)
txq.send(pkt_send)
- pkt_recv = rxq.recv(2, sent_packets)
+ while True:
+ pkt_recv = rxq.recv(2, sent_packets)
+ if pkt_recv is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if pkt_recv.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if pkt_recv is None:
raise RuntimeError('Rx timeout')
if pkt_recv is None:
raise RuntimeError('Rx timeout')
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import ipaddress
from robot.api import logger
from scapy.layers.inet import ICMP, IP
import ipaddress
from robot.api import logger
from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether, Dot1Q
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from scapy.layers.l2 import Ether, Dot1Q
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- if tx_if == rx_if:
- ether = rxq.recv(2, ignore=sent_packets)
- else:
- ether = rxq.recv(2)
- if ether is None:
- raise RuntimeError("ICMP echo Rx timeout")
+ while True:
+ if tx_if == rx_if:
+ ether = rxq.recv(2, ignore=sent_packets)
+ else:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
logger.trace("MAC matched")
else:
if rx_dst_mac == ether[Ether].dst and rx_src_mac == ether[Ether].src:
logger.trace("MAC matched")
else:
- raise RuntimeError(
- "Matching packet unsuccessful: {0}".format(ether.__repr__()))
+ raise RuntimeError("Matching packet unsuccessful: {0}".
+ format(ether.__repr__()))
if encaps_rx == 'Dot1q':
if ether[Dot1Q].vlan == int(vlan_rx):
logger.trace("VLAN matched")
else:
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
if encaps_rx == 'Dot1q':
if ether[Dot1Q].vlan == int(vlan_rx):
logger.trace("VLAN matched")
else:
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}-'
- 'received, {}-expected):\n{}'
- .format(ether[Dot1Q].vlan, vlan_rx,
- ether.__repr__()))
+ 'received, {}-expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan_rx,
+ ether.__repr__()))
ip = ether[Dot1Q].payload
elif encaps_rx == 'Dot1ad':
raise NotImplementedError()
ip = ether[Dot1Q].payload
elif encaps_rx == 'Dot1ad':
raise NotImplementedError()
ip = ether.payload
if not isinstance(ip, ip_format):
ip = ether.payload
if not isinstance(ip, ip_format):
- raise RuntimeError(
- "Not an IP packet received {0}".format(ip.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ip.__repr__()))
# Compare data from packets
if src_ip == ip.src:
logger.trace("Src IP matched")
else:
# Compare data from packets
if src_ip == ip.src:
logger.trace("Src IP matched")
else:
- raise RuntimeError("Matching Src IP unsuccessful: {} != {}"
- .format(src_ip, ip.src))
+ raise RuntimeError("Matching Src IP unsuccessful: {} != {}".
+ format(src_ip, ip.src))
if dst_ip == ip.dst:
logger.trace("Dst IP matched")
else:
if dst_ip == ip.dst:
logger.trace("Dst IP matched")
else:
- raise RuntimeError("Matching Dst IP unsuccessful: {} != {}"
- .format(dst_ip, ip.dst))
+ raise RuntimeError("Matching Dst IP unsuccessful: {} != {}".
+ format(dst_ip, ip.dst))
import sys
import ipaddress
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
from scapy.all import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
+
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMPv6 echo reply Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
if ether is None:
raise RuntimeError("ICMP echo Rx timeout")
if not ether.haslayer(ip_format):
if ether is None:
raise RuntimeError("ICMP echo Rx timeout")
if not ether.haslayer(ip_format):
- raise RuntimeError("Not an IP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an IP packet received {0}".
+ format(ether.__repr__()))
- if ether['Ethernet'].src != dut_if2_mac:
+ if ether[Ether].src != dut_if2_mac:
raise RuntimeError("Source MAC address error")
raise RuntimeError("Source MAC address error")
- if ether['Ethernet'].dst == path_1_mac:
+ if ether[Ether].dst == path_1_mac:
- elif ether['Ethernet'].dst == path_2_mac:
+ elif ether[Ether].dst == path_2_mac:
path_2_counter += 1
else:
raise RuntimeError("Destination MAC address error")
if (path_1_counter + path_2_counter) != 100:
path_2_counter += 1
else:
raise RuntimeError("Destination MAC address error")
if (path_1_counter + path_2_counter) != 100:
- raise RuntimeError("Packet loss: recevied only {} packets of 100 "
- .format(path_1_counter + path_2_counter))
+ raise RuntimeError("Packet loss: recevied only {} packets of 100 ".
+ format(path_1_counter + path_2_counter))
if path_1_counter == 0:
raise RuntimeError("Path 1 error!")
if path_1_counter == 0:
raise RuntimeError("Path 1 error!")
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import ipaddress
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
from scapy.layers.l2 import Ether
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP(code=icmp_code, type=icmp_type))
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP(code=icmp_code, type=icmp_type))
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest(code=icmp_code, type=icmp_type))
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest(code=icmp_code, type=icmp_type))
icmp_format = 'ICMPv6'
else:
raise ValueError("IP(s) not in correct format")
icmp_format = 'ICMPv6'
else:
raise ValueError("IP(s) not in correct format")
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP and ICMP
if not ether.haslayer(ip_format):
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP packet received {0}'.
+ format(ether.__repr__()))
# Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer
# Next header value of 58 means the next header is ICMPv6
if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58:
# Cannot use haslayer for ICMPv6, every type of ICMPv6 is a separate layer
# Next header value of 58 means the next header is ICMPv6
if not ether.haslayer(icmp_format) and ether[ip_format].nh != 58:
- raise RuntimeError('Not an ICMP packet received {0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP packet received {0}'.
+ format(ether.__repr__()))
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import ipaddress
import sys
import ipaddress
-from scapy.layers.inet import ICMP, IP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.all import Ether
from scapy.all import Ether
+from scapy.layers.inet import ICMP, IP
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
:type ipformat: dict
:rtype: bool
"""
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
ipformat['Type']:
return True
else:
return False
try:
if pkt[ipformat['IPType']][ipformat['ICMP_rep']].type == \
ipformat['Type']:
return True
else:
return False
+ except: # pylint: disable=bare-except
:type ipformat: dict
:rtype: bool
"""
:type ipformat: dict
:rtype: bool
"""
- #pylint: disable=bare-except
+ # pylint: disable=bare-except
try:
r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
return r_src and r_dst
try:
r_src = reply[ipformat['IPType']].src == request[ipformat['IPType']].dst
r_dst = reply[ipformat['IPType']].dst == request[ipformat['IPType']].src
return r_src and r_dst
+ except: # pylint: disable=bare-except
txq.send(icmp_request)
for _ in range(1000):
txq.send(icmp_request)
for _ in range(1000):
- icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
- if icmp_reply is None:
- timeout -= wait_step
- if timeout < 0:
- raise RuntimeError("ICMP echo Rx timeout")
- elif is_icmp_reply(icmp_reply, ip_format):
+ while True:
+ icmp_reply = rxq.recv(wait_step, ignore=sent_packets)
+ if icmp_reply is None:
+ timeout -= wait_step
+ if timeout < 0:
+ raise RuntimeError("ICMP echo Rx timeout")
+
+ elif icmp_reply.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue in case of ICMPv6ND_NS packet
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
+ if is_icmp_reply(icmp_reply, ip_format):
if address_check(icmp_request, icmp_reply, ip_format):
break
else:
if address_check(icmp_request, icmp_reply, ip_format):
break
else:
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import sys
import ipaddress
import sys
import ipaddress
+from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import ICMP, IP
from scapy.layers.inet import ICMP, IP
-from scapy.layers.l2 import Ether
-from scapy.layers.l2 import Dot1Q
-from scapy.layers.inet6 import ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP())
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IP(src=src_ip, dst=dst_ip) /
ICMP())
- ip_format = 'IP'
- icmp_format = 'ICMP'
+ ip_format = IP
+ icmp_format = ICMP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if encaps == 'Dot1q':
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if encaps == 'Dot1q':
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest())
pkt_raw = (Ether(src=src_mac, dst=dst_mac) /
IPv6(src=src_ip, dst=dst_ip) /
ICMPv6EchoRequest())
- ip_format = 'IPv6'
- icmp_format = 'ICMPv6EchoRequest'
+ ip_format = IPv6
+ icmp_format = ICMPv6EchoRequest
else:
raise ValueError("IP(s) not in correct format")
else:
raise ValueError("IP(s) not in correct format")
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
+ # Receive ICMP / ICMPv6 echo reply
+ while True:
+ ether = rxq.recv(2,)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layers Ether, IP and ICMP
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layers IP/IPv6 and
+ # ICMP/ICMPv6EchoRequest
if encaps_rx:
if encaps_rx == 'Dot1q':
if not vlan1_rx:
vlan1_rx = vlan1
if not ether.haslayer(Dot1Q):
if encaps_rx:
if encaps_rx == 'Dot1q':
if not vlan1_rx:
vlan1_rx = vlan1
if not ether.haslayer(Dot1Q):
- raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not VLAN tagged Eth frame received:\n{0}'.
+ format(ether.__repr__()))
elif ether[Dot1Q].vlan != int(vlan1_rx):
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
elif ether[Dot1Q].vlan != int(vlan1_rx):
raise RuntimeError('Ethernet frame with wrong VLAN tag ({}) '
- 'received ({} expected):\n{}'.format(
- ether[Dot1Q].vlan, vlan1_rx, ether.__repr__()))
+ 'received ({} expected):\n{}'.
+ format(ether[Dot1Q].vlan, vlan1_rx,
+ ether.__repr__()))
elif encaps_rx == 'Dot1ad':
if not vlan1_rx:
vlan1_rx = vlan1
if not vlan2_rx:
vlan2_rx = vlan2
# TODO
elif encaps_rx == 'Dot1ad':
if not vlan1_rx:
vlan1_rx = vlan1
if not vlan2_rx:
vlan2_rx = vlan2
# TODO
- raise RuntimeError('Encapsulation {0} not implemented yet.'
- .format(encaps_rx))
+ raise RuntimeError('Encapsulation {0} not implemented yet.'.
+ format(encaps_rx))
- raise RuntimeError('Unsupported/unknown encapsulation expected: {0}'
- .format(encaps_rx))
+ raise RuntimeError('Unsupported encapsulation expected: {0}'.
+ format(encaps_rx))
if not ether.haslayer(ip_format):
if not ether.haslayer(ip_format):
- raise RuntimeError('Not an IP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an IP/IPv6 packet received:\n{0}'.
+ format(ether.__repr__()))
if not ether.haslayer(icmp_format):
if not ether.haslayer(icmp_format):
- raise RuntimeError('Not an ICMP packet received:\n{0}'
- .format(ether.__repr__()))
+ raise RuntimeError('Not an ICMP/ICMPv6EchoRequest packet received:\n'
+ '{0}'.format(ether.__repr__()))
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
import ipaddress
from scapy.layers.l2 import Ether
import ipaddress
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, ICMPv6ND_RS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
sent_packets = [pkt_raw]
txq.send(pkt_raw)
sent_packets = [pkt_raw]
txq.send(pkt_raw)
- ether = rxq.recv(8, ignore=sent_packets)
+ while True:
+ ether = rxq.recv(2, sent_packets)
+ if ether is None:
+ raise RuntimeError('ICMP echo Rx timeout')
- # Check whether received packet contains layer RA and check other values
- if ether is None:
- raise RuntimeError('ICMP echo Rx timeout')
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+ # Check whether received packet contains layer RA and check other values
if ether.src != router_mac:
if ether.src != router_mac:
- raise RuntimeError(
- 'Packet source MAC ({0}) does not match '
- 'router MAC ({1}).'.format(ether.src, router_mac))
+ raise RuntimeError('Packet source MAC ({0}) does not match router MAC '
+ '({1}).'.format(ether.src, router_mac))
- raise RuntimeError(
- 'Packet destination MAC ({0}) does not match '
- 'RS source MAC ({1}).'.format(ether.dst, src_mac))
+ raise RuntimeError('Packet destination MAC ({0}) does not match RS '
+ 'source MAC ({1}).'.format(ether.dst, src_mac))
- if not ether.haslayer('ICMPv6ND_RA'):
- raise RuntimeError('Not an RA packet received {0}'
- .format(ether.__repr__()))
+ if not ether.haslayer(ICMPv6ND_RA):
+ raise RuntimeError('Not an RA packet received {0}'.
+ format(ether.__repr__()))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
if src_address != router_link_local:
rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
if src_address != router_link_local:
- raise RuntimeError(
- 'Packet source address ({0}) does not match '
- 'link local address({1})'.format(src_address, router_link_local))
+ raise RuntimeError('Packet source address ({0}) does not match link '
+ 'local address({1})'.
+ format(src_address, router_link_local))
if dst_address != rs_src_address:
if dst_address != rs_src_address:
- raise RuntimeError(
- 'Packet destination address ({0}) does not match '
- 'RS source address ({1}).'.format(dst_address, rs_src_address))
+ raise RuntimeError('Packet destination address ({0}) does not match '
+ 'RS source address ({1}).'.
+ format(dst_address, rs_src_address))
if ether['IPv6'].hlim != 255:
if ether['IPv6'].hlim != 255:
- raise RuntimeError('Hop limit not correct: {0}!=255'.format(
- ether['IPv6'].hlim))
+ raise RuntimeError('Hop limit not correct: {0}!=255'.
+ format(ether['IPv6'].hlim))
- ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
+ ra_code = ether[ICMPv6ND_RA].code
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
if ra_code != 0:
raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
sys.exit(0)
if __name__ == "__main__":
main()
if __name__ == "__main__":
main()
from TG to DUT.
"""
import sys
from TG to DUT.
"""
import sys
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether, Packet, Raw
from scapy.all import Ether, Packet, Raw
+from scapy.layers.inet import IP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.SFC.VerifyPacket import *
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send TCP packet from one traffic generator interface to DUT.
:raises: If the IP address is invalid.
"""
def main():
"""Send TCP packet from one traffic generator interface to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
sent_packets = []
protocol = TCP
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
raise ValueError("Invalid IP version!")
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
raise ValueError("Invalid IP version!")
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(timeout)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the NSH SFC loopback packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
# let us begin to check the NSH SFC loopback packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
import sys
import ipaddress
import sys
import ipaddress
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
from scapy.all import Ether
from scapy.all import Ether
+from scapy.layers.inet import IP, UDP, TCP
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
def main():
"""Send TCP or UDP packet from one traffic generator interface to the other.
"""
- args = TrafficScriptArg(
- ['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
- 'source_port', 'destination_port'])
+ args = TrafficScriptArg(['tx_mac', 'rx_mac', 'src_ip', 'dst_ip', 'protocol',
+ 'source_port', 'destination_port'])
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
src_mac = args.get_arg('tx_mac')
dst_mac = args.get_arg('rx_mac')
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
source_port = args.get_arg('source_port')
destination_port = args.get_arg('destination_port')
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
elif protocol.upper() == 'UDP':
protocol = UDP
else:
elif protocol.upper() == 'UDP':
protocol = UDP
else:
- raise ValueError("Invalid type of protocol!")
+ raise ValueError("Invalid protocol type!")
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
rxq = RxQueue(rx_if)
txq = TxQueue(tx_if)
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
protocol(sport=int(source_port), dport=int(destination_port)))
txq.send(pkt_raw)
- if ether is None:
- raise RuntimeError("TCP/UDP Rx timeout")
+ while True:
+ ether = rxq.recv(2)
+ if ether is None:
+ raise RuntimeError('TCP/UDP Rx timeout')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
print ("TCP packet received.")
print ("TCP packet received.")
print ("UDP packet received.")
else:
print ("UDP packet received.")
else:
- raise RuntimeError("Not an TCP or UDP packet received {0}"
- .format(ether.__repr__()))
+ raise RuntimeError("Not an TCP or UDP packet received {0}".
+ format(ether.__repr__()))
import time
from scapy.layers.inet import IP, UDP, TCP
import time
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN packet from TG to DUT.
:raises: If the IP address is invalid.
"""
def main():
"""Send VxLAN packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
sent_packets = []
protocol = TCP
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00'
raw_data = vxlan + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
vxlan = '\x08\x00\x00\x00\x00\x00\x01\x00'
raw_data = vxlan + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4789) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4789) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy outbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
# let us begin to check the proxy outbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
from TG to DUT.
"""
import sys
from TG to DUT.
"""
import sys
from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
sent_packets = []
protocol = TCP
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x09\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the proxy inbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
# let us begin to check the proxy inbound packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
from TG to DUT.
"""
import sys
from TG to DUT.
"""
import sys
from scapy.layers.inet import IP, UDP, TCP
from scapy.layers.inet import IP, UDP, TCP
-from scapy.layers.inet6 import IPv6
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
from scapy.all import Ether, Packet, Raw
from resources.libraries.python.SFC.VerifyPacket import *
-from resources.libraries.python.SFC.SFCConstants import SFCConstants as sfccon
+from resources.libraries.python.SFC.SFCConstants import SFCConstants as SfcCon
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
-from robot.api import logger
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
def main():
"""Send VxLAN-GPE+NSH packet from TG to DUT.
:raises: If the IP address is invalid.
"""
- args = TrafficScriptArg(
- ['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
- 'timeout', 'framesize', 'testtype'])
+ args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip',
+ 'timeout', 'framesize', 'testtype'])
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
src_mac = args.get_arg('src_mac')
dst_mac = args.get_arg('dst_mac')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
dst_ip = args.get_arg('dst_ip')
tx_if = args.get_arg('tx_if')
rx_if = args.get_arg('rx_if')
- timeout = int(args.get_arg('timeout'))
+ timeout = max(2, int(args.get_arg('timeout')))
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
frame_size = int(args.get_arg('framesize'))
test_type = args.get_arg('testtype')
sent_packets = []
protocol = TCP
sent_packets = []
protocol = TCP
- source_port = sfccon.DEF_SRC_PORT
- destination_port = sfccon.DEF_DST_PORT
+ source_port = SfcCon.DEF_SRC_PORT
+ destination_port = SfcCon.DEF_DST_PORT
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
if valid_ipv4(src_ip) and valid_ipv4(dst_ip):
ip_version = IP
elif valid_ipv6(src_ip) and valid_ipv6(dst_ip):
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
raise ValueError("Invalid IP version!")
innerpkt = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- protocol(sport=int(source_port), dport=int(destination_port)))
+ ip_version(src=src_ip, dst=dst_ip) /
+ protocol(sport=int(source_port), dport=int(destination_port)))
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
vxlangpe_nsh = '\x0c\x00\x00\x04\x00\x00\x0a\x00\x00\x06' \
'\x01\x03\x00\x00\xb9\xff\xC0\xA8\x32\x4B' \
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
raw_data = vxlangpe_nsh + str(innerpkt)
pkt_header = (Ether(src=src_mac, dst=dst_mac) /
- ip_version(src=src_ip, dst=dst_ip) /
- UDP(sport=int(source_port), dport=4790) /
- Raw(load=raw_data))
+ ip_version(src=src_ip, dst=dst_ip) /
+ UDP(sport=int(source_port), dport=4790) /
+ Raw(load=raw_data))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
fsize_no_fcs = frame_size - 4
pad_len = max(0, fsize_no_fcs - len(pkt_header))
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
sent_packets.append(pkt_raw)
txq.send(pkt_raw)
- ether = rxq.recv(2)
-
- if ether is None:
- raise RuntimeError("No packet is received!")
+ while True:
+ ether = rxq.recv(timeout)
+ if ether is None:
+ raise RuntimeError('No packet is received!')
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
# let us begin to check the sfc sff packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
# let us begin to check the sfc sff packet
VerifyPacket.check_the_nsh_sfc_packet(ether, frame_size, test_type)
import ipaddress
from scapy.layers.inet import IP, ICMP, ARP
import ipaddress
from scapy.layers.inet import IP, ICMP, ARP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6ND_NS
from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
from scapy.layers.l2 import Ether
from resources.libraries.python.PacketVerifier import RxQueue, TxQueue, auto_pad
"""Send a simple L2 or ICMP packet from one TG interface to DUT, then
receive a copy of the packet on the second TG interface, and a copy of
the ICMP reply."""
"""Send a simple L2 or ICMP packet from one TG interface to DUT, then
receive a copy of the packet on the second TG interface, and a copy of
the ICMP reply."""
- args = TrafficScriptArg(
- ['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac', 'ptype'])
+ args = TrafficScriptArg(['tg_src_mac', 'src_ip', 'dst_ip', 'dut_if1_mac',
+ 'ptype'])
src_mac = args.get_arg('tg_src_mac')
dst_mac = args.get_arg('dut_if1_mac')
src_mac = args.get_arg('tg_src_mac')
dst_mac = args.get_arg('dut_if1_mac')
txq.send(pkt_raw)
sent.append(auto_pad(pkt_raw))
txq.send(pkt_raw)
sent.append(auto_pad(pkt_raw))
- ether = rxq_mirrored.recv(2)
# Receive copy of Rx packet.
# Receive copy of Rx packet.
- if ether is None:
- raise RuntimeError("Rx timeout of mirrored Rx packet")
+ while True:
+ ether = rxq_mirrored.recv(2)
+ if ether is None:
+ raise RuntimeError("Rx timeout of mirrored Rx packet")
+
+ if ether.haslayer(ICMPv6ND_NS):
+ # read another packet in the queue if the current one is ICMPv6ND_NS
+ continue
+ else:
+ # otherwise process the current packet
+ break
+
pkt = auto_pad(pkt_raw)
if str(ether) != str(pkt):
print("Mirrored Rx packet doesn't match the original Rx packet.")
pkt = auto_pad(pkt_raw)
if str(ether) != str(pkt):
print("Mirrored Rx packet doesn't match the original Rx packet.")
# Receive reply on TG Tx port.
ether_repl = rxq_tx.recv(2, sent)
# Receive reply on TG Tx port.
ether_repl = rxq_tx.recv(2, sent)
if ether_repl is None:
raise RuntimeError("Reply not received on TG Tx port.")
if ether_repl is None:
raise RuntimeError("Reply not received on TG Tx port.")
- else:
- print("Reply received on TG Tx port.\n")
+
+ print("Reply received on TG Tx port.\n")
# Receive copy of Tx packet.
ether = rxq_mirrored.recv(2)
if ether is None:
raise RuntimeError("Rx timeout of mirrored Tx packet")
# Receive copy of Tx packet.
ether = rxq_mirrored.recv(2)
if ether is None:
raise RuntimeError("Rx timeout of mirrored Tx packet")
if str(ether) != str(ether_repl):
print("Mirrored Tx packet doesn't match the received Tx packet.")
if ether.src != ether_repl.src or ether.dst != ether_repl.dst:
if str(ether) != str(ether_repl):
print("Mirrored Tx packet doesn't match the received Tx packet.")
if ether.src != ether_repl.src or ether.dst != ether_repl.dst: