rules = []
for r in self.rules:
rules.append(r.encode())
- self._test.vapi.gbp_contract_add_del(
+ r = self._test.vapi.gbp_contract_add_del(
1,
self.sclass,
self.dclass,
self.acl_index,
rules,
self.allowed_ethertypes)
+ self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
return True
return False
+ def get_drop_stats(self):
+ c = self._test.statistics.get_counter("/net/gbp/contract/drop")
+ return c[0][self.stats_index]
+
+ def get_permit_stats(self):
+ c = self._test.statistics.get_counter("/net/gbp/contract/permit")
+ return c[0][self.stats_index]
+
class VppGbpVxlanTunnel(VppInterface):
"""
pkt_inter_epg_221_to_220 * 65,
eps[0].itf)
+ ds = c2.get_drop_stats()
+ self.assertEqual(ds['packets'], 0)
+ ps = c2.get_permit_stats()
+ self.assertEqual(ps['packets'], 65)
+
#
# the contract does not allow non-IP
#
def test_gbp_learn_l2(self):
""" GBP L2 Endpoint Learning """
+ self.vapi.cli("clear errors")
+
ep_flags = VppEnum.vl_api_gbp_endpoint_flags_t
learnt = [{'mac': '00:00:11:11:11:01',
'ip': '10.0.0.1',
self.send_and_assert_no_replies(self.pg2, p)
+ self.logger.info(self.vapi.cli("sh error"))
+ # self.assert_packet_counter_equal(
+ # '/err/gbp-policy-port/drop-no-contract', 1)
+
#
# we should not have learnt a new tunnel endpoint, since
# the EPG was not learnt.
vx_tun_l2_1.sw_if_index,
ip=l['ip']))
+ # self.assert_packet_counter_equal(
+ # '/err/gbp-policy-port/allow-intra-sclass', 2)
+
self.logger.info(self.vapi.cli("show gbp endpoint"))
self.logger.info(self.vapi.cli("show gbp vxlan"))
self.logger.info(self.vapi.cli("show ip mfib"))