+ def test_SRv6_End_AD_L2(self):
+ """ Test SRv6 End.AD behavior with L2 traffic.
+ """
+ self.src_addr = 'a0::'
+ self.sid_list = ['a1::', 'a2::a4', 'a3::']
+ self.test_sid_index = 1
+
+ # send traffic to one destination interface
+ # source and destination interfaces are IPv6 only
+ self.setup_interfaces(ipv6=[True, False])
+
+ # configure route to next segment
+ route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
+ [VppRoutePath(self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route.add_vpp_config()
+
+ # configure SRv6 localSID behavior
+ cli_str = "sr localsid address " + \
+ self.sid_list[self.test_sid_index] + \
+ " behavior end.ad" + \
+ " oif " + self.pg1.name + \
+ " iif " + self.pg1.name
+ self.vapi.cli(cli_str)
+
+ # log the localsids
+ self.logger.debug(self.vapi.cli("show sr localsid"))
+
+ # send one packet per packet size
+ count = len(self.pg_packet_sizes)
+
+ # prepare L2 in SRv6 headers
+ packet_header1 = self.create_packet_header_IPv6_SRH_L2(
+ srcaddr=self.src_addr,
+ sidlist=self.sid_list[::-1],
+ segleft=len(self.sid_list) - self.test_sid_index - 1,
+ vlan=0)
+
+ # generate packets (pg0->pg1)
+ pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
+ self.pg_packet_sizes, count)
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
+ self.compare_rx_tx_packet_End_AD_L2_out)
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # prepare L2 header for returning packets
+ packet_header2 = self.create_packet_header_L2()
+
+ # generate returning packets (pg1->pg0)
+ pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
+ self.pg_packet_sizes, count)
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
+ self.compare_rx_tx_packet_End_AD_L2_in)
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # remove SRv6 localSIDs
+ cli_str = "sr localsid del address " + \
+ self.sid_list[self.test_sid_index]
+ self.vapi.cli(cli_str)
+
+ # cleanup interfaces
+ self.teardown_interfaces()
+
+ def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
+ """ Compare input and output packet after passing End.AD with L2
+
+ :param tx_pkt: transmitted packet
+ :param rx_pkt: received packet
+ """
+
+ # get IPv4 header of rx'ed packet
+ rx_eth = rx_pkt.getlayer(Ether)
+
+ tx_ip = tx_pkt.getlayer(IPv6)
+ # we can't just get the 2nd Ether layer
+ # get the Raw content and dissect it as Ether
+ tx_eth1 = Ether(str(tx_pkt[Raw]))
+
+ # verify if rx'ed packet has no SRH
+ self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
+
+ # the whole rx_eth pkt should be equal to tx_eth1
+ self.assertEqual(rx_eth, tx_eth1)
+
+ self.logger.debug("packet verification: SUCCESS")
+
+ def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
+ """ Compare input and output packet after passing End.AD
+
+ :param tx_pkt: transmitted packet
+ :param rx_pkt: received packet
+ """
+
+ ####
+ # get first (outer) IPv6 header of rx'ed packet
+ rx_ip = rx_pkt.getlayer(IPv6)
+ # received ip.src should be equal to SR Policy source
+ self.assertEqual(rx_ip.src, self.src_addr)
+ # received ip.dst should be equal to expected sidlist next segment
+ self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
+
+ # rx'ed packet should have SRH
+ self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
+
+ # get SRH
+ rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
+ # rx'ed seglist should be equal to SID-list in reversed order
+ self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
+ # segleft should be equal to previous segleft value minus 1
+ self.assertEqual(rx_srh.segleft,
+ len(self.sid_list) - self.test_sid_index - 2)
+ # lastentry should be equal to the SID-list length minus 1
+ self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
+
+ # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
+ tx_ether = tx_pkt.getlayer(Ether)
+ self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
+
+ self.logger.debug("packet verification: SUCCESS")
+