X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_srv6_ad.py;h=a788f1e4974699da9289c21f098988487d907e46;hb=7f9b7f9f492d1748d8ba025b3a713058fdb1943d;hp=13979ba13a25f6fb30fe5f8a4fe2af9810a71261;hpb=bcd4c4a2c48c9977e481dc189cc71874d15c8998;p=vpp.git diff --git a/test/test_srv6_ad.py b/test/test_srv6_ad.py index 13979ba13a2..a788f1e4974 100644 --- a/test/test_srv6_ad.py +++ b/test/test_srv6_ad.py @@ -5,10 +5,12 @@ import binascii from socket import AF_INET6 from framework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable +from vpp_ip import DpoProto +from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes +import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting @@ -26,6 +28,10 @@ class TestSRv6(VppTestCase): def setUpClass(self): super(TestSRv6, self).setUpClass() + @classmethod + def tearDownClass(cls): + super(TestSRv6, cls).tearDownClass() + def setUp(self): """ Perform test setup before each test case. """ @@ -362,7 +368,7 @@ class TestSRv6(VppTestCase): tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - tx_ip2 = IP(str(tx_ip2)) + tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) @@ -404,7 +410,137 @@ class TestSRv6(VppTestCase): tx_ip.chksum = None # -> read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible - self.assertEqual(rx_srh.payload, IP(str(tx_ip))) + self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip))) + + self.logger.debug("packet verification: SUCCESS") + + def test_SRv6_End_AD_L2(self): + """ Test SRv6 End.AD behavior with L2 traffic. + """ + self.src_addr = 'a0::' + self.sid_list = ['a1::', 'a2::a4', 'a3::'] + self.test_sid_index = 1 + + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, False]) + + # configure route to next segment + route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128, + [VppRoutePath(self.pg0.remote_ip6, + self.pg0.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID behavior + cli_str = "sr localsid address " + \ + self.sid_list[self.test_sid_index] + \ + " behavior end.ad" + \ + " oif " + self.pg1.name + \ + " iif " + self.pg1.name + self.vapi.cli(cli_str) + + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + + # prepare L2 in SRv6 headers + packet_header1 = self.create_packet_header_IPv6_SRH_L2( + srcaddr=self.src_addr, + sidlist=self.sid_list[::-1], + segleft=len(self.sid_list) - self.test_sid_index - 1, + vlan=0) + + # generate packets (pg0->pg1) + pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts1, self.pg1, + self.compare_rx_tx_packet_End_AD_L2_out) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # prepare L2 header for returning packets + packet_header2 = self.create_packet_header_L2() + + # generate returning packets (pg1->pg0) + pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg1, pkts2, self.pg0, + self.compare_rx_tx_packet_End_AD_L2_in) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + cli_str = "sr localsid del address " + \ + self.sid_list[self.test_sid_index] + self.vapi.cli(cli_str) + + # cleanup interfaces + self.teardown_interfaces() + + def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AD with L2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get IPv4 header of rx'ed packet + rx_eth = rx_pkt.getlayer(Ether) + + tx_ip = tx_pkt.getlayer(IPv6) + # we can't just get the 2nd Ether layer + # get the Raw content and dissect it as Ether + tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_eth pkt should be equal to tx_eth1 + self.assertEqual(rx_eth, tx_eth1) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AD + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + #### + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, self.src_addr) + # received ip.dst should be equal to expected sidlist next segment + self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1]) + + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + # rx'ed seglist should be equal to SID-list in reversed order + self.assertEqual(rx_srh.addresses, self.sid_list[::-1]) + # segleft should be equal to previous segleft value minus 1 + self.assertEqual(rx_srh.segleft, + len(self.sid_list) - self.test_sid_index - 2) + # lastentry should be equal to the SID-list length minus 1 + self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1) + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + tx_ether = tx_pkt.getlayer(Ether) + self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether) self.logger.debug("packet verification: SUCCESS") @@ -442,7 +578,7 @@ class TestSRv6(VppTestCase): # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible - p = Ether(str(p)) + p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) @@ -547,6 +683,50 @@ class TestSRv6(VppTestCase): UDP(sport=1234, dport=1234) return p + def create_packet_header_L2(self, vlan=0): + """Create packet header: L2 header + + :param vlan: if vlan!=0 then add 802.1q header + """ + # Note: the dst addr ('00:55:44:33:22:11') is used in + # the compare function compare_rx_tx_packet_T_Encaps_L2 + # to detect presence of L2 in SRH payload + p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + p /= Dot1Q(vlan=vlan, type=etype) + else: + p.type = etype + return p + + def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft, + vlan=0): + """Create packet header: L2 encapsulated in SRv6: + IPv6 header with SRH, L2 + + :param int srcaddr: IPv6 source address + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + + IPv6 source address is set to srcaddr + IPv6 destination address is set to sidlist[segleft] + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \ + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=59) / \ + eth + return p + def get_payload_info(self, packet): """ Extract the payload_info from the packet """ @@ -554,14 +734,14 @@ class TestSRv6(VppTestCase): # but packet[Raw] gives the complete payload # (incl L2 header) for the T.Encaps L2 case try: - payload_info = self.payload_to_info(str(packet[Raw])) + payload_info = self.payload_to_info(packet[Raw]) except: # remote L2 header from packet[Raw]: # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( - str(Ether(str(packet[Raw]))[Raw])) + Ether(scapy.compat.raw(packet[Raw]))[Raw]) return payload_info @@ -575,7 +755,7 @@ class TestSRv6(VppTestCase): :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" - % (dst_if.name, compare_func.func_name)) + % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: @@ -618,7 +798,6 @@ class TestSRv6(VppTestCase): compare_func(txed_packet, packet) except: - print packet.command() self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise