X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_srv6_as.py;h=2be7865d5bdb9201667997112d8542b6b3519b06;hb=097fa66b986f06281f603767d321ab13ab6c88c3;hp=0e94bd97ce673829bdc4a4f994bf3eb5341f8fa0;hpb=0928da903f6be7c95189ac692e13d8edee4ebcd5;p=vpp.git diff --git a/test/test_srv6_as.py b/test/test_srv6_as.py old mode 100644 new mode 100755 index 0e94bd97ce6..2be7865d5bd --- a/test/test_srv6_as.py +++ b/test/test_srv6_as.py @@ -5,10 +5,11 @@ import binascii from socket import AF_INET6 from framework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable +from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting @@ -26,6 +27,10 @@ class TestSRv6(VppTestCase): def setUpClass(self): super(TestSRv6, self).setUpClass() + @classmethod + def tearDownClass(cls): + super(TestSRv6, cls).tearDownClass() + def setUp(self): """ Perform test setup before each test case. """ @@ -162,6 +167,91 @@ class TestSRv6(VppTestCase): test_sid_index=0, rewrite_src_addr='a1::') + def test_SRv6_End_AS_L2_noSRH(self): + """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite. + """ + self.run_SRv6_End_AS_L2( + sid_list=['a1::', 'a2::a6', 'a3::'], + test_sid_index=1, + rewrite_src_addr='a2::') + + def test_SRv6_End_AS_L2_SRH(self): + """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite. + """ + self.run_SRv6_End_AS_L2( + sid_list=['a1::a6', 'a2::', 'a3::'], + test_sid_index=0, + rewrite_src_addr='a1::') + + def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr): + """ Run SRv6 End.AS test with L2 traffic. + """ + self.rewrite_src_addr = rewrite_src_addr + self.rewrite_sid_list = sid_list[test_sid_index + 1::] + + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, False]) + + # configure route to next segment + route = VppIpRoute(self, sid_list[test_sid_index + 1], 128, + [VppRoutePath(self.pg0.remote_ip6, + self.pg0.sw_if_index)]) + route.add_vpp_config() + + # configure SRv6 localSID behavior + cli_str = "sr localsid address " + sid_list[test_sid_index] \ + + " behavior end.as" \ + + " oif " + self.pg1.name \ + + " iif " + self.pg1.name \ + + " src " + self.rewrite_src_addr + for s in self.rewrite_sid_list: + cli_str += " next " + s + self.vapi.cli(cli_str) + + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + + # prepare L2 in SRv6 headers + packet_header1 = self.create_packet_header_IPv6_SRH_L2( + sidlist=sid_list[::-1], + segleft=len(sid_list) - test_sid_index - 1, + vlan=0) + + # generate packets (pg0->pg1) + pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts1, self.pg1, + self.compare_rx_tx_packet_End_AS_L2_out) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # prepare L2 header for returning packets + packet_header2 = self.create_packet_header_L2() + + # generate returning packets (pg1->pg0) + pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg1, pkts2, self.pg0, + self.compare_rx_tx_packet_End_AS_L2_in) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + self.vapi.cli("sr localsid del address " + sid_list[test_sid_index]) + + # cleanup interfaces + self.teardown_interfaces() + def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr): """ Run SRv6 End.AS test with IPv6 traffic. """ @@ -175,9 +265,7 @@ class TestSRv6(VppTestCase): # configure route to next segment route = VppIpRoute(self, sid_list[test_sid_index + 1], 128, [VppRoutePath(self.pg0.remote_ip6, - self.pg0.sw_if_index, - proto=DpoProto.DPO_PROTO_IP6)], - is_ip6=1) + self.pg0.sw_if_index)]) route.add_vpp_config() # configure SRv6 localSID behavior @@ -246,9 +334,7 @@ class TestSRv6(VppTestCase): # configure route to next segment route = VppIpRoute(self, sid_list[test_sid_index + 1], 128, [VppRoutePath(self.pg0.remote_ip6, - self.pg0.sw_if_index, - proto=DpoProto.DPO_PROTO_IP6)], - is_ip6=1) + self.pg0.sw_if_index)]) route.add_vpp_config() # configure SRv6 localSID behavior @@ -401,12 +487,59 @@ class TestSRv6(VppTestCase): tx_ip.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip = IP(str(tx_ip)) + tx_ip = IP(scapy.compat.raw(tx_ip)) self.assertEqual(payload, tx_ip) self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AS + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + rx_srh = None + + tx_ether = tx_pkt.getlayer(Ether) + + # expected segment-list (SRH order) + tx_seglist = self.rewrite_sid_list[::-1] + + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, self.rewrite_src_addr) + # received ip.dst should be equal to expected sidlist[lastentry] + self.assertEqual(rx_ip.dst, tx_seglist[-1]) + + if len(tx_seglist) > 1: + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + # rx'ed seglist should be equal to seglist + self.assertEqual(rx_srh.addresses, tx_seglist) + # segleft should be equal to size seglist-1 + self.assertEqual(rx_srh.segleft, len(tx_seglist)-1) + # segleft should be equal to lastentry + self.assertEqual(rx_srh.segleft, rx_srh.lastentry) + # nh should be "No Next Header" (59) + self.assertEqual(rx_srh.nh, 59) + # get payload + payload = rx_srh.payload + else: + # rx'ed packet should NOT have SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + # get payload + payload = rx_ip.payload + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether) + + self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt): """ Compare input and output packet after passing End.AS with IPv6 @@ -456,12 +589,35 @@ class TestSRv6(VppTestCase): tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip2 = IP(str(tx_ip2)) + tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) self.logger.debug("packet verification: SUCCESS") + def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AS with L2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get IPv4 header of rx'ed packet + rx_eth = rx_pkt.getlayer(Ether) + + tx_ip = tx_pkt.getlayer(IPv6) + # we can't just get the 2nd Ether layer + # get the Raw content and dissect it as Ether + tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_eth pkt should be equal to tx_eth1 + self.assertEqual(rx_eth, tx_eth1) + + self.logger.debug("packet verification: SUCCESS") + def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count): """Create SRv6 input packet stream for defined interface. @@ -498,7 +654,7 @@ class TestSRv6(VppTestCase): # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible - p = Ether(str(p)) + p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) @@ -604,6 +760,48 @@ class TestSRv6(VppTestCase): UDP(sport=1234, dport=1234)) return p + def create_packet_header_L2(self, vlan=0): + """Create packet header: L2 header + + :param vlan: if vlan!=0 then add 802.1q header + """ + # Note: the dst addr ('00:55:44:33:22:11') is used in + # the compare function compare_rx_tx_packet_T_Encaps_L2 + # to detect presence of L2 in SRH payload + p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + p /= Dot1Q(vlan=vlan, type=etype) + else: + p.type = etype + return p + + def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0): + """Create packet header: L2 encapsulated in SRv6: + IPv6 header with SRH, L2 + + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + + Outer IPv6 destination address is set to sidlist[segleft] + IPv6 source address is 1234::1 + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = (IPv6(src='1234::1', dst=sidlist[segleft]) / + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=59) / + eth) + return p + def get_payload_info(self, packet): """ Extract the payload_info from the packet """ @@ -611,14 +809,14 @@ class TestSRv6(VppTestCase): # but packet[Raw] gives the complete payload # (incl L2 header) for the T.Encaps L2 case try: - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) except: # remote L2 header from packet[Raw]: # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( - str(Ether(str(packet[Raw]))[Raw])) + Ether(scapy.compat.raw(packet[Raw]))[Raw]) return payload_info @@ -632,7 +830,7 @@ class TestSRv6(VppTestCase): :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" - % (dst_if.name, compare_func.func_name)) + % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: @@ -675,7 +873,6 @@ class TestSRv6(VppTestCase): compare_func(txed_packet, packet) except: - print packet.command() self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise