+ def test_SRv6_End_AS_L2_noSRH(self):
+ """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
+ """
+ self.run_SRv6_End_AS_L2(
+ sid_list=['a1::', 'a2::a6', 'a3::'],
+ test_sid_index=1,
+ rewrite_src_addr='a2::')
+
+ def test_SRv6_End_AS_L2_SRH(self):
+ """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
+ """
+ self.run_SRv6_End_AS_L2(
+ sid_list=['a1::a6', 'a2::', 'a3::'],
+ test_sid_index=0,
+ rewrite_src_addr='a1::')
+
+ def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
+ """ Run SRv6 End.AS test with L2 traffic.
+ """
+ self.rewrite_src_addr = rewrite_src_addr
+ self.rewrite_sid_list = sid_list[test_sid_index + 1::]
+
+ # send traffic to one destination interface
+ # source and destination interfaces are IPv6 only
+ self.setup_interfaces(ipv6=[True, False])
+
+ # configure route to next segment
+ route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
+ [VppRoutePath(self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route.add_vpp_config()
+
+ # configure SRv6 localSID behavior
+ cli_str = "sr localsid address " + sid_list[test_sid_index] \
+ + " behavior end.as" \
+ + " oif " + self.pg1.name \
+ + " iif " + self.pg1.name \
+ + " src " + self.rewrite_src_addr
+ for s in self.rewrite_sid_list:
+ cli_str += " next " + s
+ self.vapi.cli(cli_str)
+
+ # log the localsids
+ self.logger.debug(self.vapi.cli("show sr localsid"))
+
+ # send one packet per packet size
+ count = len(self.pg_packet_sizes)
+
+ # prepare L2 in SRv6 headers
+ packet_header1 = self.create_packet_header_IPv6_SRH_L2(
+ sidlist=sid_list[::-1],
+ segleft=len(sid_list) - test_sid_index - 1,
+ vlan=0)
+
+ # generate packets (pg0->pg1)
+ pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
+ self.pg_packet_sizes, count)
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
+ self.compare_rx_tx_packet_End_AS_L2_out)
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # prepare L2 header for returning packets
+ packet_header2 = self.create_packet_header_L2()
+
+ # generate returning packets (pg1->pg0)
+ pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
+ self.pg_packet_sizes, count)
+
+ # send packets and verify received packets
+ self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
+ self.compare_rx_tx_packet_End_AS_L2_in)
+
+ # log the localsid counters
+ self.logger.info(self.vapi.cli("show sr localsid"))
+
+ # remove SRv6 localSIDs
+ self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
+
+ # cleanup interfaces
+ self.teardown_interfaces()
+