X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_srv6_ad.py;h=75d9d12da4802eaf341c6bec55566ab96877b236;hb=429542476578e947399b2f09f9e8dd5aa1ac0951;hp=13979ba13a25f6fb30fe5f8a4fe2af9810a71261;hpb=bcd4c4a2c48c9977e481dc189cc71874d15c8998;p=vpp.git diff --git a/test/test_srv6_ad.py b/test/test_srv6_ad.py index 13979ba13a2..75d9d12da48 100644 --- a/test/test_srv6_ad.py +++ b/test/test_srv6_ad.py @@ -408,6 +408,136 @@ class TestSRv6(VppTestCase): self.logger.debug("packet verification: SUCCESS") + def test_SRv6_End_AD_L2(self): + """ Test SRv6 End.AD behavior with L2 traffic. + """ + self.src_addr = 'a0::' + self.sid_list = ['a1::', 'a2::a4', 'a3::'] + self.test_sid_index = 1 + + # send traffic to one destination interface + # source and destination interfaces are IPv6 only + self.setup_interfaces(ipv6=[True, False]) + + # configure route to next segment + route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128, + [VppRoutePath(self.pg0.remote_ip6, + self.pg0.sw_if_index, + proto=DpoProto.DPO_PROTO_IP6)], + is_ip6=1) + route.add_vpp_config() + + # configure SRv6 localSID behavior + cli_str = "sr localsid address " + \ + self.sid_list[self.test_sid_index] + \ + " behavior end.ad" + \ + " oif " + self.pg1.name + \ + " iif " + self.pg1.name + self.vapi.cli(cli_str) + + # log the localsids + self.logger.debug(self.vapi.cli("show sr localsid")) + + # send one packet per packet size + count = len(self.pg_packet_sizes) + + # prepare L2 in SRv6 headers + packet_header1 = self.create_packet_header_IPv6_SRH_L2( + srcaddr=self.src_addr, + sidlist=self.sid_list[::-1], + segleft=len(self.sid_list) - self.test_sid_index - 1, + vlan=0) + + # generate packets (pg0->pg1) + pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg0, pkts1, self.pg1, + self.compare_rx_tx_packet_End_AD_L2_out) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # prepare L2 header for returning packets + packet_header2 = self.create_packet_header_L2() + + # generate returning packets (pg1->pg0) + pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2, + self.pg_packet_sizes, count) + + # send packets and verify received packets + self.send_and_verify_pkts(self.pg1, pkts2, self.pg0, + self.compare_rx_tx_packet_End_AD_L2_in) + + # log the localsid counters + self.logger.info(self.vapi.cli("show sr localsid")) + + # remove SRv6 localSIDs + cli_str = "sr localsid del address " + \ + self.sid_list[self.test_sid_index] + self.vapi.cli(cli_str) + + # cleanup interfaces + self.teardown_interfaces() + + def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AD with L2 + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + # get IPv4 header of rx'ed packet + rx_eth = rx_pkt.getlayer(Ether) + + tx_ip = tx_pkt.getlayer(IPv6) + # we can't just get the 2nd Ether layer + # get the Raw content and dissect it as Ether + tx_eth1 = Ether(str(tx_pkt[Raw])) + + # verify if rx'ed packet has no SRH + self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # the whole rx_eth pkt should be equal to tx_eth1 + self.assertEqual(rx_eth, tx_eth1) + + self.logger.debug("packet verification: SUCCESS") + + def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt): + """ Compare input and output packet after passing End.AD + + :param tx_pkt: transmitted packet + :param rx_pkt: received packet + """ + + #### + # get first (outer) IPv6 header of rx'ed packet + rx_ip = rx_pkt.getlayer(IPv6) + # received ip.src should be equal to SR Policy source + self.assertEqual(rx_ip.src, self.src_addr) + # received ip.dst should be equal to expected sidlist next segment + self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1]) + + # rx'ed packet should have SRH + self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) + + # get SRH + rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting) + # rx'ed seglist should be equal to SID-list in reversed order + self.assertEqual(rx_srh.addresses, self.sid_list[::-1]) + # segleft should be equal to previous segleft value minus 1 + self.assertEqual(rx_srh.segleft, + len(self.sid_list) - self.test_sid_index - 2) + # lastentry should be equal to the SID-list length minus 1 + self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1) + + # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt + tx_ether = tx_pkt.getlayer(Ether) + self.assertEqual(Ether(str(rx_srh.payload)), tx_ether) + + self.logger.debug("packet verification: SUCCESS") + def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count): """Create SRv6 input packet stream for defined interface. @@ -547,6 +677,50 @@ class TestSRv6(VppTestCase): UDP(sport=1234, dport=1234) return p + def create_packet_header_L2(self, vlan=0): + """Create packet header: L2 header + + :param vlan: if vlan!=0 then add 802.1q header + """ + # Note: the dst addr ('00:55:44:33:22:11') is used in + # the compare function compare_rx_tx_packet_T_Encaps_L2 + # to detect presence of L2 in SRH payload + p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + p /= Dot1Q(vlan=vlan, type=etype) + else: + p.type = etype + return p + + def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft, + vlan=0): + """Create packet header: L2 encapsulated in SRv6: + IPv6 header with SRH, L2 + + :param int srcaddr: IPv6 source address + :param list sidlist: segment list of outer IPv6 SRH + :param int segleft: segments-left field of outer IPv6 SRH + :param vlan: L2 vlan; if vlan!=0 then add 802.1q header + + IPv6 source address is set to srcaddr + IPv6 destination address is set to sidlist[segleft] + """ + eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') + etype = 0x8137 # IPX + if vlan: + # add 802.1q layer + eth /= Dot1Q(vlan=vlan, type=etype) + else: + eth.type = etype + + p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \ + IPv6ExtHdrSegmentRouting(addresses=sidlist, + segleft=segleft, nh=59) / \ + eth + return p + def get_payload_info(self, packet): """ Extract the payload_info from the packet """