1 from vpp_interface import VppInterface
2 from vpp_papi import VppEnum
5 INDEX_INVALID = 0xffffffff
8 def find_vxlan_tunnel(test, src, dst, vni):
9 ts = test.vapi.vxlan_tunnel_dump(INDEX_INVALID)
11 if src == str(t.src_address) and \
12 dst == str(t.dst_address) and \
18 class VppVxlanTunnel(VppInterface):
23 def __init__(self, test, src, dst, vni, mcast_itf=None,
24 mcast_sw_if_index=INDEX_INVALID,
25 decap_next_index=INDEX_INVALID,
26 encap_vrf_id=None, instance=0xffffffff):
27 """ Create VXLAN Tunnel interface """
28 super(VppVxlanTunnel, self).__init__(test)
32 self.mcast_itf = mcast_itf
33 self.mcast_sw_if_index = mcast_sw_if_index
34 self.encap_vrf_id = encap_vrf_id
35 self.decap_next_index = decap_next_index
36 self.instance = instance
39 self.mcast_sw_if_index = self.mcast_itf.sw_if_index
41 def add_vpp_config(self):
42 reply = self.test.vapi.vxlan_add_del_tunnel(
43 is_add=1, src_address=self.src, dst_address=self.dst, vni=self.vni,
44 mcast_sw_if_index=self.mcast_sw_if_index,
45 encap_vrf_id=self.encap_vrf_id,
46 instance=self.instance, decap_next_index=self.decap_next_index)
47 self.set_sw_if_index(reply.sw_if_index)
48 self._test.registry.register(self, self._test.logger)
50 def remove_vpp_config(self):
51 self.test.vapi.vxlan_add_del_tunnel(
52 is_add=0, src_address=self.src, dst_address=self.dst, vni=self.vni,
53 mcast_sw_if_index=self.mcast_sw_if_index,
54 encap_vrf_id=self.encap_vrf_id, instance=self.instance,
55 decap_next_index=self.decap_next_index)
57 def query_vpp_config(self):
58 return (INDEX_INVALID != find_vxlan_tunnel(self._test,
64 return "vxlan-%d-%d-%s-%s" % (self.sw_if_index, self.vni,