CGN: IPFIX logging
[vpp.git] / test / test_snat.py
index 42b0719..ace1723 100644 (file)
@@ -1338,7 +1338,7 @@ class TestDeterministicNAT(MethodHolder):
             cls.icmp_id_in = 6305
             cls.snat_addr = '10.0.0.3'
 
-            cls.create_pg_interfaces(range(2))
+            cls.create_pg_interfaces(range(3))
             cls.interfaces = list(cls.pg_interfaces)
 
             for i in cls.interfaces:
@@ -1483,6 +1483,21 @@ class TestDeterministicNAT(MethodHolder):
             self.logger.error("TCP 3 way handshake failed")
             raise
 
+    def verify_ipfix_max_entries_per_user(self, data):
+        """
+        Verify IPFIX maximum entries per user exceeded event
+
+        :param data: Decoded IPFIX data records
+        """
+        self.assertEqual(1, len(data))
+        record = data[0]
+        # natEvent
+        self.assertEqual(ord(record[230]), 13)
+        # natQuotaExceededEvent
+        self.assertEqual('\x03\x00\x00\x00', record[466])
+        # sourceIPv4Address
+        self.assertEqual(self.pg0.remote_ip4n, record[8])
+
     def test_deterministic_mode(self):
         """ S-NAT run deterministic mode """
         in_addr = '172.16.255.0'
@@ -1827,6 +1842,11 @@ class TestDeterministicNAT(MethodHolder):
         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
                                                  is_inside=0)
+        self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+                                     src_address=self.pg2.local_ip4n,
+                                     path_mtu=512,
+                                     template_interval=10)
+        self.vapi.snat_ipfix()
 
         pkts = []
         for port in range(1025, 2025):
@@ -1852,10 +1872,26 @@ class TestDeterministicNAT(MethodHolder):
 
         self.assertEqual(1000, dms[0].ses_num)
 
+        # verify IPFIX logging
+        self.vapi.cli("ipfix flush")  # FIXME this should be an API call
+        capture = self.pg2.get_capture(2)
+        ipfix = IPFIXDecoder()
+        # first load template
+        for p in capture:
+            self.assertTrue(p.haslayer(IPFIX))
+            if p.haslayer(Template):
+                ipfix.add_template(p.getlayer(Template))
+        # verify events in data set
+        for p in capture:
+            if p.haslayer(Data):
+                data = ipfix.decode_data_set(p.getlayer(Set))
+                self.verify_ipfix_max_entries_per_user(data)
+
     def clear_snat(self):
         """
         Clear SNAT configuration.
         """
+        self.vapi.snat_ipfix(enable=0)
         self.vapi.snat_det_set_timeouts()
         deterministic_mappings = self.vapi.snat_det_map_dump()
         for dsm in deterministic_mappings: