X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_snat.py;h=ace17237b38fc0e89af6139f83ec981b50c31ec5;hb=refs%2Fchanges%2F98%2F6098%2F5;hp=42b071931784d72378a448af9ab1bde767f59bed;hpb=d367768270a3d19447af1a7059068e1f20fd15a6;p=vpp.git diff --git a/test/test_snat.py b/test/test_snat.py index 42b07193178..ace17237b38 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -1338,7 +1338,7 @@ class TestDeterministicNAT(MethodHolder): cls.icmp_id_in = 6305 cls.snat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(2)) + cls.create_pg_interfaces(range(3)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: @@ -1483,6 +1483,21 @@ class TestDeterministicNAT(MethodHolder): self.logger.error("TCP 3 way handshake failed") raise + def verify_ipfix_max_entries_per_user(self, data): + """ + Verify IPFIX maximum entries per user exceeded event + + :param data: Decoded IPFIX data records + """ + self.assertEqual(1, len(data)) + record = data[0] + # natEvent + self.assertEqual(ord(record[230]), 13) + # natQuotaExceededEvent + self.assertEqual('\x03\x00\x00\x00', record[466]) + # sourceIPv4Address + self.assertEqual(self.pg0.remote_ip4n, record[8]) + def test_deterministic_mode(self): """ S-NAT run deterministic mode """ in_addr = '172.16.255.0' @@ -1827,6 +1842,11 @@ class TestDeterministicNAT(MethodHolder): self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) + self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n, + src_address=self.pg2.local_ip4n, + path_mtu=512, + template_interval=10) + self.vapi.snat_ipfix() pkts = [] for port in range(1025, 2025): @@ -1852,10 +1872,26 @@ class TestDeterministicNAT(MethodHolder): self.assertEqual(1000, dms[0].ses_num) + # verify IPFIX logging + self.vapi.cli("ipfix flush") # FIXME this should be an API call + capture = self.pg2.get_capture(2) + ipfix = IPFIXDecoder() + # first load template + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + ipfix.add_template(p.getlayer(Template)) + # verify events in data set + for p in capture: + if p.haslayer(Data): + data = ipfix.decode_data_set(p.getlayer(Set)) + self.verify_ipfix_max_entries_per_user(data) + def clear_snat(self): """ Clear SNAT configuration. """ + self.vapi.snat_ipfix(enable=0) self.vapi.snat_det_set_timeouts() deterministic_mappings = self.vapi.snat_det_map_dump() for dsm in deterministic_mappings: