-#!/usr/bin/env python
+#!/usr/bin/env python3
import unittest
import binascii
from socket import AF_INET6
from framework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
from scapy.layers.inet import IP, UDP
-from scapy.utils import inet_pton, inet_ntop
-
from util import ppp
""" SRv6 Test Case """
@classmethod
- def setUpClass(self):
- super(TestSRv6, self).setUpClass()
+ def setUpClass(cls):
+ super(TestSRv6, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestSRv6, cls).tearDownClass()
def setUp(self):
""" Perform test setup before each test case.
if any(ipv6):
self.logger.debug(self.vapi.cli("show ip6 neighbors"))
if any(ipv4):
- self.logger.debug(self.vapi.cli("show ip arp"))
+ self.logger.debug(self.vapi.cli("show ip4 neighbors"))
self.logger.debug(self.vapi.cli("show interface"))
self.logger.debug(self.vapi.cli("show hardware"))
self.logger.debug("Tear down interface %s" % (i.name))
i.admin_down()
i.unconfig()
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
@unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Encaps(self):
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure encaps IPv6 source address
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure encaps IPv6 source address
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure encaps IPv6 source address
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure encaps IPv6 source address
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure SRv6 localSID End without PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='A3::0',
+ self, localsid='A3::0',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
- nh_addr='::',
+ nh_addr=0,
end_psp=0,
sw_if_index=0,
vlan_index=0,
# TODO: test behavior with SL=0 packet (needs 2*SRH?)
+ expected_count = len(pkts)
+
+ # packets without SRH (should not crash)
+ packet_header = self.create_packet_header_IPv6('a3::')
+ # create traffic stream pg0->pg1
+ pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
+ self.pg_packet_sizes, count))
+
# send packets and verify received packets
self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End)
+ self.compare_rx_tx_packet_End,
+ expected_count=expected_count)
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
# configure SRv6 localSID End with PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='A3::0',
+ self, localsid='A3::0',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
- nh_addr='::',
+ nh_addr=0,
end_psp=1,
sw_if_index=0,
vlan_index=0,
# a4::/64 via pg1 and pg2
route = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6),
+ self.pg1.sw_if_index),
VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg2.sw_if_index)])
route.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip6 fib"))
# configure SRv6 localSID End.X without PSP behavior
# End.X points to interface pg1
localsid = VppSRv6LocalSID(
- self, localsid_addr='A3::C4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
nh_addr=self.pg1.remote_ip6,
end_psp=0,
self.compare_rx_tx_packet_End)
# assert nothing was received on the other interface (pg2)
- self.pg2.assert_nothing_captured("mis-directed packet(s)")
+ self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# configure FIB entries
# a4::/64 via pg1 and pg2
route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6),
+ [VppRoutePath(
+ self.pg1.remote_ip6,
+ self.pg1.sw_if_index),
VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg2.sw_if_index)])
route.add_vpp_config()
# configure SRv6 localSID End with PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='A3::C4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
nh_addr=self.pg1.remote_ip6,
end_psp=1,
self.compare_rx_tx_packet_End_PSP)
# assert nothing was received on the other interface (pg2)
- self.pg2.assert_nothing_captured("mis-directed packet(s)")
+ self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# configure SRv6 localSID End.DX6 behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='a3::c4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
nh_addr=self.pg1.remote_ip6,
end_psp=0,
# source interface in global FIB (0)
# destination interfaces in global and vrf
vrf_1 = 1
+ ipt = VppIpTable(self, vrf_1, is_ip6=True)
+ ipt.add_vpp_config()
self.setup_interfaces(ipv6=[True, True, True],
ipv6_table_id=[0, 0, vrf_1])
route0 = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg1.remote_ip6,
self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6,
nh_table_id=0)],
- table_id=0,
- is_ip6=1)
+ table_id=0)
route0.add_vpp_config()
route1 = VppIpRoute(self, "a4::", 64,
[VppRoutePath(self.pg2.remote_ip6,
self.pg2.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6,
nh_table_id=vrf_1)],
- table_id=vrf_1,
- is_ip6=1)
+ table_id=vrf_1)
route1.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip6 fib"))
# fib_table: where the localsid is installed
# sw_if_index: in T-variants of localsid this is the vrf table_id
localsid = VppSRv6LocalSID(
- self, localsid_addr='a3::c4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
- nh_addr='::',
+ nh_addr=0,
end_psp=0,
sw_if_index=vrf_1,
vlan_index=0,
self.compare_rx_tx_packet_End_DX6)
# assert nothing was received on the other interface (pg2)
- self.pg1.assert_nothing_captured("mis-directed packet(s)")
+ self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# configure SRv6 localSID End.DX4 behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='a3::c4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
nh_addr=self.pg1.remote_ip4,
end_psp=0,
# source interface in global FIB (0)
# destination interfaces in global and vrf
vrf_1 = 1
+ ipt = VppIpTable(self, vrf_1)
+ ipt.add_vpp_config()
self.setup_interfaces(ipv6=[True, False, False],
ipv4=[False, True, True],
ipv6_table_id=[0, 0, 0],
[VppRoutePath(self.pg1.remote_ip4,
self.pg1.sw_if_index,
nh_table_id=0)],
- table_id=0,
- is_ip6=0)
+ table_id=0)
route0.add_vpp_config()
route1 = VppIpRoute(self, "4.1.1.0", 24,
[VppRoutePath(self.pg2.remote_ip4,
self.pg2.sw_if_index,
nh_table_id=vrf_1)],
- table_id=vrf_1,
- is_ip6=0)
+ table_id=vrf_1)
route1.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip fib"))
# fib_table: where the localsid is installed
# sw_if_index: in T-variants of localsid: vrf table_id
localsid = VppSRv6LocalSID(
- self, localsid_addr='a3::c4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
- nh_addr='::',
+ nh_addr=0,
end_psp=0,
sw_if_index=vrf_1,
vlan_index=0,
self.compare_rx_tx_packet_End_DX4)
# assert nothing was received on the other interface (pg2)
- self.pg1.assert_nothing_captured("mis-directed packet(s)")
+ self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# configure SRv6 localSID End.DX2 behavior
localsid = VppSRv6LocalSID(
- self, localsid_addr='a3::c4',
+ self, localsid='A3::C4',
behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
- nh_addr='::',
+ nh_addr=0,
end_psp=0,
sw_if_index=self.pg1.sw_if_index,
vlan_index=0,
# configure FIB entries
route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg4.remote_ip6,
- self.pg4.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ [VppRoutePath(
+ self.pg4.remote_ip6,
+ self.pg4.sw_if_index)])
route.add_vpp_config()
# configure encaps IPv6 source address
# add classify table
# mask on dst ip address prefix a7::/8
- mask = '{:0<16}'.format('ff')
+ mask = '{!s:0<16}'.format('ff')
r = self.vapi.classify_add_del_table(
1,
binascii.unhexlify(mask),
match_n_vectors=(len(mask) - 1) // 32 + 1,
current_data_flag=1,
skip_n_vectors=2) # data offset
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
table_index = r.new_table_index
- # add the source routign node as a ip6 inacl netxt node
+ # add the source routing node as a ip6 inacl netxt node
r = self.vapi.add_node_next('ip6-inacl',
'sr-pl-rewrite-insert')
inacl_next_node_index = r.node_index
- match = '{:0<16}'.format('a7')
+ match = '{!s:0<16}'.format('a7')
r = self.vapi.classify_add_del_session(
1,
table_index,
hit_next_index=inacl_next_node_index,
action=3,
metadata=0) # sr policy index
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
# log the classify table used in the steering policy
self.logger.info(self.vapi.cli("show classify table"))
sw_if_index=self.pg3.sw_if_index,
ip6_table_index=table_index)
self.assertIsNotNone(r,
- msg='No response msg for input_acl_set_interface')
+ 'No response msg for input_acl_set_interface')
# log the ip6 inacl
self.logger.info(self.vapi.cli("show inacl type ip6"))
sw_if_index=self.pg3.sw_if_index,
ip6_table_index=table_index)
self.assertIsNotNone(r,
- msg='No response msg for input_acl_set_interface')
+ 'No response msg for input_acl_set_interface')
# log the ip6 inacl after cleaning
self.logger.info(self.vapi.cli("show inacl type ip6"))
0,
table_index,
binascii.unhexlify(match))
- self.assertIsNotNone(r, msg='No response msg for add_del_session')
+ self.assertIsNotNone(r, 'No response msg for add_del_session')
r = self.vapi.classify_add_del_table(
0,
binascii.unhexlify(mask),
table_index=table_index)
- self.assertIsNotNone(r, msg='No response msg for add_del_table')
+ self.assertIsNotNone(r, 'No response msg for add_del_table')
self.logger.info(self.vapi.cli("show classify table"))
tx_ip.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip = IP(str(tx_ip))
+ tx_ip = IP(scapy.compat.raw(tx_ip))
self.assertEqual(rx_srh.payload, tx_ip)
self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
- # nh should be "No Next Header" (59)
- self.assertEqual(rx_srh.nh, 59)
+ # nh should be "No Next Header" (143)
+ self.assertEqual(rx_srh.nh, 143)
# the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
- self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
self.logger.debug("packet verification: SUCCESS")
tx_ip2.chksum = None
# read back the pkt (with str()) to force computing these fields
# probably other ways to accomplish this are possible
- tx_ip2 = IP(str(tx_ip2))
+ tx_ip2 = IP(scapy.compat.raw(tx_ip2))
self.assertEqual(rx_ip, tx_ip2)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
- tx_eth1 = Ether(str(tx_pkt[Raw]))
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# read back the dumped packet (with str())
# to force computing these fields
# probably other ways are possible
- p = Ether(str(p))
+ p = Ether(scapy.compat.raw(p))
payload_info.data = p.copy()
self.logger.debug(ppp("Created packet:", p))
pkts.append(p)
self.logger.info("Done creating packets")
return pkts
- def send_and_verify_pkts(self, input, pkts, output, compare_func):
+ def send_and_verify_pkts(self, input, pkts, output, compare_func,
+ expected_count=None):
"""Send packets and verify received packets using compare_func
:param input: ingress interface of DUT
:param pkts: list of packets to transmit
:param output: egress interface of DUT
:param compare_func: function to compare in and out packets
+ :param expected_count: expected number of captured packets (if
+ different than len(pkts))
"""
# add traffic stream to input interface
input.add_stream(pkts)
# get output capture
self.logger.info("Getting packet capture")
- capture = output.get_capture()
+ capture = output.get_capture(expected_count=expected_count)
# assert nothing was captured on input interface
input.assert_nothing_captured()
p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=59) /
+ segleft=segleft, nh=143) /
eth)
return p
else:
eth.type = etype
- p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
+ p = (IPv6(src='1234::1', dst=dst_outer, nh=143) / eth)
return p
def get_payload_info(self, packet):
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
try:
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
except:
# remote L2 header from packet[Raw]:
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- str(Ether(str(packet[Raw]))[Raw]))
+ Ether(scapy.compat.r(packet[Raw]))[Raw])
return payload_info
:param compare_func: function to compare in and out packet
"""
self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.func_name))
+ % (dst_if.name, compare_func.__name__))
last_info = dict()
for i in self.pg_interfaces:
compare_func(txed_packet, packet)
except:
- print packet.command()
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
+ # FIXME: there is no need to check manually that all the packets
+ # arrived (already done so by get_capture); checking here
+ # prevents testing packets that are expected to be dropped, so
+ # commenting this out for now
+
# have all expected packets arrived?
- for i in self.pg_interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
+ # for i in self.pg_interfaces:
+ # remaining_packet = self.get_next_packet_info_for_interface2(
+ # i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
+ # self.assertTrue(remaining_packet is None,
+ # "Interface %s: Packet expected from interface %s "
+ # "didn't arrive" % (dst_if.name, i.name))
if __name__ == '__main__':