+ self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
+
+ self.logger.debug("packet verification: SUCCESS")
+
+ def test_SRv6_End_AD_L2(self):
+ """Test SRv6 End.AD behavior with L2 traffic."""
+ self.src_addr = "a0::"
+ self.sid_list = ["a1::", "a2::a4", "a3::"]
+ self.test_sid_index = 1
+
+ # send traffic to one destination interface
+ # source and destination interfaces are IPv6 only
+ self.setup_interfaces(ipv6=[True, False])
+
+ # configure route to next segment
+ route = VppIpRoute(
+ self,
+ self.sid_list[self.test_sid_index + 1],
+ 128,
+ [
+ VppRoutePath(
+ self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6,
+ )
+ ],
+ )
+ route.add_vpp_config()
+
+ # configure SRv6 localSID behavior
+ cli_str = (
+ "sr localsid address "
+ + self.sid_list[self.test_sid_index]
+ + " behavior end.ad"
+ + " oif "
+ + self.pg1.name
+ + " iif "
+ + self.pg1.name
+ )
+ self.vapi.cli(cli_str)
+
+ # log the localsids
+ self.logger.debug(self.vapi.cli("show sr localsid"))
+
+ # send one packet per packet size
+ count = len(self.pg_packet_sizes)
+
+ # prepare L2 in SRv6 headers
+ packet_header1 = self.create_packet_header_IPv6_SRH_L2(
+ srcaddr=self.src_addr,
+ sidlist=self.sid_list[::-1],
+ segleft=len(self.sid_list) - self.test_sid_index - 1,
+ vlan=0,
+ )
+
+ # generate packets (pg0->pg1)
+ pkts1 = self.create_stream(
+ self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
+ )
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(
+ self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_L2_out
+ )
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # prepare L2 header for returning packets
+ packet_header2 = self.create_packet_header_L2()
+
+ # generate returning packets (pg1->pg0)
+ pkts2 = self.create_stream(
+ self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
+ )
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(
+ self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_L2_in
+ )
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # remove SRv6 localSIDs
+ cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
+ self.vapi.cli(cli_str)
+
+ # cleanup interfaces
+ self.teardown_interfaces()
+
+ def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
+ """Compare input and output packet after passing End.AD with L2
+
+ :param tx_pkt: transmitted packet
+ :param rx_pkt: received packet
+ """
+
+ # get IPv4 header of rx'ed packet
+ rx_eth = rx_pkt.getlayer(Ether)
+
+ tx_ip = tx_pkt.getlayer(IPv6)
+ # we can't just get the 2nd Ether layer
+ # get the Raw content and dissect it as Ether
+ tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
+
+ # verify if rx'ed packet has no SRH
+ self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
+
+ # the whole rx_eth pkt should be equal to tx_eth1
+ self.assertEqual(rx_eth, tx_eth1)
+
+ self.logger.debug("packet verification: SUCCESS")
+
+ def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
+ """Compare input and output packet after passing End.AD
+
+ :param tx_pkt: transmitted packet
+ :param rx_pkt: received packet
+ """
+
+ ####
+ # get first (outer) IPv6 header of rx'ed packet
+ rx_ip = rx_pkt.getlayer(IPv6)
+ # received ip.src should be equal to SR Policy source
+ self.assertEqual(rx_ip.src, self.src_addr)
+ # received ip.dst should be equal to expected sidlist next segment
+ self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
+
+ # rx'ed packet should have SRH
+ self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
+
+ # get SRH
+ rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
+ # rx'ed seglist should be equal to SID-list in reversed order
+ self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
+ # segleft should be equal to previous segleft value minus 1
+ self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
+ # lastentry should be equal to the SID-list length minus 1
+ self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
+
+ # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
+ tx_ether = tx_pkt.getlayer(Ether)
+ self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)