from vpp_interface import VppInterface
-from vpp_ip import VppIpAddress
from vpp_papi import VppEnum
def find_vxlan_gbp_tunnel(test, src, dst, vni):
- vsrc = VppIpAddress(src)
- vdst = VppIpAddress(dst)
-
ts = test.vapi.vxlan_gbp_tunnel_dump(INDEX_INVALID)
for t in ts:
- if vsrc == t.tunnel.src and \
- vdst == t.tunnel.dst and \
+ if src == str(t.tunnel.src) and \
+ dst == str(t.tunnel.dst) and \
t.tunnel.vni == vni:
return t.tunnel.sw_if_index
return INDEX_INVALID
is_ipv6=None, encap_table_id=None, instance=0xffffffff):
""" Create VXLAN-GBP Tunnel interface """
super(VppVxlanGbpTunnel, self).__init__(test)
- self.src = VppIpAddress(src)
- self.dst = VppIpAddress(dst)
+ self.src = src
+ self.dst = dst
self.vni = vni
self.mcast_itf = mcast_itf
self.ipv6 = is_ipv6
else:
self.mode = mode
+ def encode(self):
+ return {
+ 'src': self.src,
+ 'dst': self.dst,
+ 'mode': self.mode,
+ 'vni': self.vni,
+ 'mcast_sw_if_index': self.mcast_itf.sw_if_index
+ if self.mcast_itf else INDEX_INVALID,
+ 'encap_table_id': self.encap_table_id,
+ 'instance': self.instance,
+ }
+
def add_vpp_config(self):
- mcast_sw_if_index = INDEX_INVALID
- if (self.mcast_itf):
- mcast_sw_if_index = self.mcast_itf.sw_if_index
reply = self.test.vapi.vxlan_gbp_tunnel_add_del(
is_add=1,
- tunnel={
- 'src': self.src.encode(),
- 'dst': self.dst.encode(),
- 'mode': self.mode,
- 'vni': self.vni,
- 'mcast_sw_if_index': mcast_sw_if_index,
- 'encap_table_id': self.encap_table_id,
- 'instance': self.instance
- })
+ tunnel=self.encode(),
+ )
self.set_sw_if_index(reply.sw_if_index)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- mcast_sw_if_index = INDEX_INVALID
- if (self.mcast_itf):
- mcast_sw_if_index = self.mcast_itf.sw_if_index
self.test.vapi.vxlan_gbp_tunnel_add_del(
is_add=0,
- tunnel={
- 'src': self.src.encode(),
- 'dst': self.dst.encode(),
- 'mode': self.mode,
- 'vni': self.vni,
- 'mcast_sw_if_index': mcast_sw_if_index,
- 'encap_table_id': self.encap_table_id,
- 'instance': self.instance,
- })
+ tunnel=self.encode(),
+ )
def query_vpp_config(self):
return (INDEX_INVALID != find_vxlan_gbp_tunnel(self._test,