1 from vpp_interface import VppInterface
2 from vpp_papi import VppEnum
5 INDEX_INVALID = 0xFFFFFFFF
10 def find_vxlan_gpe_tunnel(test, src, dst, s_port, d_port, vni):
11 ts = test.vapi.vxlan_gpe_tunnel_v2_dump(INDEX_INVALID)
13 src_port = DEFAULT_PORT
14 if s_port != UNDEFINED_PORT:
17 dst_port = DEFAULT_PORT
18 if d_port != UNDEFINED_PORT:
24 and dst == str(t.remote)
25 and src_port == t.local_port
26 and dst_port == t.remote_port
33 class VppVxlanGpeTunnel(VppInterface):
35 VPP VXLAN GPE interface
44 src_port=UNDEFINED_PORT,
45 dst_port=UNDEFINED_PORT,
46 mcast_sw_if_index=INDEX_INVALID,
51 """Create VXLAN GPE Tunnel interface"""
52 super(VppVxlanGpeTunnel, self).__init__(test)
56 self.src_port = src_port
57 self.dst_port = dst_port
58 self.mcast_sw_if_index = mcast_sw_if_index
59 self.encap_vrf_id = encap_vrf_id
60 self.decap_vrf_id = decap_vrf_id
63 def add_vpp_config(self):
64 reply = self.test.vapi.vxlan_gpe_add_del_tunnel_v2(
69 local_port=self.src_port,
70 remote_port=self.dst_port,
71 mcast_sw_if_index=self.mcast_sw_if_index,
72 encap_vrf_id=self.encap_vrf_id,
73 decap_vrf_id=self.decap_vrf_id,
74 protocol=self.protocol,
76 self.set_sw_if_index(reply.sw_if_index)
77 self._test.registry.register(self, self._test.logger)
79 def remove_vpp_config(self):
80 self.test.vapi.vxlan_gpe_add_del_tunnel_v2(
85 local_port=self.src_port,
86 remote_port=self.dst_port,
87 mcast_sw_if_index=self.mcast_sw_if_index,
88 encap_vrf_id=self.encap_vrf_id,
89 decap_vrf_id=self.decap_vrf_id,
90 protocol=self.protocol,
93 def query_vpp_config(self):
94 return INDEX_INVALID != find_vxlan_gpe_tunnel(
95 self._test, self.src, self.dst, self.src_port, self.dst_port, self.vni
99 return "vxlan-%d-%d-%s-%s" % (self.sw_if_index, self.vni, self.src, self.dst)