6 from vpp_object import VppObject
7 from vpp_ip import VppIpAddress
8 from vpp_lo_interface import VppLoInterface
9 from vpp_papi import MACAddress
10 from vpp_sub_interface import L2_VTR_OP
28 def find_bridge_domain(test, bd_id):
29 bds = test.vapi.bridge_domain_dump(bd_id)
33 def find_bridge_domain_port(test, bd_id, sw_if_index):
34 bds = test.vapi.bridge_domain_dump(bd_id)
36 for p in bd.sw_if_details:
37 if p.sw_if_index == sw_if_index:
42 def find_bridge_domain_arp_entry(test, bd_id, mac, ip):
43 vmac = MACAddress(mac)
44 vip = VppIpAddress(ip)
51 arps = test.vapi.bd_ip_mac_dump(bd_id)
53 # do IP addr comparison too once .api is fixed...
54 if vmac.packed == arp.mac_address and \
55 vip.bytes == arp.ip_address[:n]:
60 def find_l2_fib_entry(test, bd_id, mac, sw_if_index):
61 vmac = MACAddress(mac)
62 lfs = test.vapi.l2_fib_table_dump(bd_id)
64 if vmac.packed == lf.mac and sw_if_index == lf.sw_if_index:
69 class VppBridgeDomain(VppObject):
71 def __init__(self, test, bd_id,
72 flood=1, uu_flood=1, forward=1,
77 self.uu_flood = uu_flood
78 self.forward = forward
80 self.arp_term = arp_term
82 def add_vpp_config(self):
83 self._test.vapi.bridge_domain_add_del(bd_id=self.bd_id,
85 uu_flood=self.uu_flood,
88 arp_term=self.arp_term, is_add=1)
89 self._test.registry.register(self, self._test.logger)
91 def remove_vpp_config(self):
92 self._test.vapi.bridge_domain_add_del(bd_id=self.bd_id, is_add=0)
94 def query_vpp_config(self):
95 return find_bridge_domain(self._test, self.bd_id)
98 return "bridge-domain-%d" % (self.bd_id)
101 class VppBridgeDomainPort(VppObject):
103 def __init__(self, test, bd, itf,
104 port_type=L2_PORT_TYPE.NORMAL):
108 self.port_type = port_type
110 def add_vpp_config(self):
111 self._test.vapi.sw_interface_set_l2_bridge(
112 rx_sw_if_index=self.itf.sw_if_index, bd_id=self.bd.bd_id,
113 port_type=self.port_type, enable=1)
114 self._test.registry.register(self, self._test.logger)
116 def remove_vpp_config(self):
117 self._test.vapi.sw_interface_set_l2_bridge(
118 rx_sw_if_index=self.itf.sw_if_index, bd_id=self.bd.bd_id,
119 port_type=self.port_type, enable=0)
121 def query_vpp_config(self):
122 return find_bridge_domain_port(self._test,
124 self.itf.sw_if_index)
127 return "BD-Port-%s-%s" % (self.bd, self.itf)
130 class VppBridgeDomainArpEntry(VppObject):
132 def __init__(self, test, bd, mac, ip):
135 self.mac = MACAddress(mac)
136 self.ip = VppIpAddress(ip)
138 def add_vpp_config(self):
139 self._test.vapi.bd_ip_mac_add_del(bd_id=self.bd.bd_id, is_add=1,
142 self._test.registry.register(self, self._test.logger)
144 def remove_vpp_config(self):
145 self._test.vapi.bd_ip_mac_add_del(bd_id=self.bd.bd_id, is_add=0,
149 def query_vpp_config(self):
150 return find_bridge_domain_arp_entry(self._test,
156 return "BD-Arp-Entry-%s-%s-%s" % (self.bd, self.mac, self.ip.address)
159 class VppL2FibEntry(VppObject):
161 def __init__(self, test, bd, mac, itf,
162 static_mac=0, filter_mac=0, bvi_mac=-1):
165 self.mac = MACAddress(mac)
167 self.static_mac = static_mac
168 self.filter_mac = filter_mac
170 self.bvi_mac = isinstance(self.itf, VppLoInterface)
172 self.bvi_mac = bvi_mac
174 def add_vpp_config(self):
175 self._test.vapi.l2fib_add_del(
178 self.itf.sw_if_index,
180 static_mac=self.static_mac,
181 filter_mac=self.filter_mac,
182 bvi_mac=self.bvi_mac)
183 self._test.registry.register(self, self._test.logger)
185 def remove_vpp_config(self):
186 self._test.vapi.l2fib_add_del(
189 self.itf.sw_if_index,
192 def query_vpp_config(self):
193 return find_l2_fib_entry(self._test,
196 self.itf.sw_if_index)
199 return "L2-Fib-Entry-%s-%s-%s" % (self.bd, self.mac, self.itf)
202 class VppL2Vtr(VppObject):
204 def __init__(self, test, itf, op):
209 def add_vpp_config(self):
210 self.itf.set_vtr(self.op)
211 self._test.registry.register(self, self._test.logger)
213 def remove_vpp_config(self):
214 self.itf.set_vtr(L2_VTR_OP.L2_DISABLED)
216 def query_vpp_config(self):
217 ds = self._test.vapi.sw_interface_dump()
218 d = self.itf.get_interface_config_from_dump(ds)
221 return (d.vtr_op == self.op)
225 return "L2-vtr-%s-%d" % (str(self.itf), self.op)