nat: add ipfix rate-limiter for nat44-ed, nat44-ei and nat64
[vpp.git] / test / test_nat64.py
index 05a2031..4ae9f66 100644 (file)
@@ -462,9 +462,10 @@ class TestNAT64(VppTestCase):
         # natEvent
         self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
-        self.assertEqual(struct.pack("I", 2), record[466])
+        self.assertEqual(struct.pack("!I", 2), record[466])
         # maxBIBEntries
-        self.assertEqual(struct.pack("I", limit), record[472])
+        self.assertEqual(struct.pack("!I", limit), record[472])
+        return len(data)
 
     def verify_ipfix_bib(self, data, is_create, src_addr):
         """
@@ -622,9 +623,10 @@ class TestNAT64(VppTestCase):
         # natEvent
         self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
-        self.assertEqual(struct.pack("I", 1), record[466])
+        self.assertEqual(struct.pack("!I", 1), record[466])
         # maxSessionEntries
-        self.assertEqual(struct.pack("I", limit), record[471])
+        self.assertEqual(struct.pack("!I", limit), record[471])
+        return len(data)
 
     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
         """NAT64 inside interface handles Neighbor Advertisement"""
@@ -1859,7 +1861,7 @@ class TestNAT64(VppTestCase):
             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
             / IPv6(src=src, dst=remote_host_ip6)
             / TCP(sport=12345, dport=25)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1878,16 +1880,18 @@ class TestNAT64(VppTestCase):
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
+        event_count = 0
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_sessions(data, max_sessions)
+                event_count += self.verify_ipfix_max_sessions(data, max_sessions)
+        self.assertEqual(event_count, 1)
 
         p = (
             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
             / TCP(sport=12345, dport=80)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1895,6 +1899,7 @@ class TestNAT64(VppTestCase):
         self.vapi.ipfix_flush()
         capture = self.pg3.get_capture(1)
         # verify events in data set
+        event_count = 0
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
             self.assertEqual(p[IP].src, self.pg3.local_ip4)
@@ -1904,7 +1909,8 @@ class TestNAT64(VppTestCase):
             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_bibs(data, max_bibs)
+                event_count += self.verify_ipfix_max_bibs(data, max_bibs)
+        self.assertEqual(event_count, 1)
 
     def test_ipfix_bib_ses(self):
         """IPFIX logging NAT64 BIB/session create and delete events"""