from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
def setUpClass(self):
super(TestSRv6, self).setUpClass()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSRv6, cls).tearDownClass()
+
def setUp(self):
""" Perform test setup before each test case.
"""
route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
[VppRoutePath(self.pg0.remote_ip6,
self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ proto=DpoProto.DPO_PROTO_IP6)])
route.add_vpp_config()
# configure SRv6 localSID behavior
route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
[VppRoutePath(self.pg0.remote_ip6,
self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ proto=DpoProto.DPO_PROTO_IP6)])
route.add_vpp_config()
# configure SRv6 localSID behavior
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
tx_ip.chksum = None
# -> read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
+ self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
self.logger.debug("packet verification: SUCCESS")
route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
[VppRoutePath(self.pg0.remote_ip6,
self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ proto=DpoProto.DPO_PROTO_IP6)])
route.add_vpp_config()
# configure SRv6 localSID behavior
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
tx_ether = tx_pkt.getlayer(Ether)
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
try:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
except:
# remote L2 header from packet[Raw]:
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(scapy.compat.raw(packet[Raw]))[Raw])
return payload_info
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
compare_func(txed_packet, packet)
except:
- print packet.command()
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise