1 from vpp_interface import VppInterface
2 from vpp_papi import VppEnum
5 INDEX_INVALID = 0xFFFFFFFF
10 def find_vxlan_tunnel(test, src, dst, s_port, d_port, vni):
11 ts = test.vapi.vxlan_tunnel_v2_dump(INDEX_INVALID)
13 src_port = DEFAULT_PORT
14 if s_port != UNDEFINED_PORT:
17 dst_port = DEFAULT_PORT
18 if d_port != UNDEFINED_PORT:
23 src == str(t.src_address)
24 and dst == str(t.dst_address)
25 and src_port == t.src_port
26 and dst_port == t.dst_port
33 class VppVxlanTunnel(VppInterface):
44 src_port=UNDEFINED_PORT,
45 dst_port=UNDEFINED_PORT,
47 mcast_sw_if_index=INDEX_INVALID,
48 decap_next_index=INDEX_INVALID,
53 """Create VXLAN Tunnel interface"""
54 super(VppVxlanTunnel, self).__init__(test)
58 self.src_port = src_port
59 self.dst_port = dst_port
60 self.mcast_itf = mcast_itf
61 self.mcast_sw_if_index = mcast_sw_if_index
62 self.encap_vrf_id = encap_vrf_id
63 self.decap_next_index = decap_next_index
64 self.instance = instance
68 self.mcast_sw_if_index = self.mcast_itf.sw_if_index
70 def add_vpp_config(self):
71 reply = self.test.vapi.vxlan_add_del_tunnel_v3(
76 src_port=self.src_port,
77 dst_port=self.dst_port,
78 mcast_sw_if_index=self.mcast_sw_if_index,
79 encap_vrf_id=self.encap_vrf_id,
81 instance=self.instance,
82 decap_next_index=self.decap_next_index,
84 self.set_sw_if_index(reply.sw_if_index)
85 self._test.registry.register(self, self._test.logger)
87 def remove_vpp_config(self):
88 self.test.vapi.vxlan_add_del_tunnel_v2(
93 src_port=self.src_port,
94 dst_port=self.dst_port,
95 mcast_sw_if_index=self.mcast_sw_if_index,
96 encap_vrf_id=self.encap_vrf_id,
97 instance=self.instance,
98 decap_next_index=self.decap_next_index,
101 def query_vpp_config(self):
102 return INDEX_INVALID != find_vxlan_tunnel(
103 self._test, self.src, self.dst, self.src_port, self.dst_port, self.vni
107 return "vxlan-%d-%d-%s-%s" % (self.sw_if_index, self.vni, self.src, self.dst)