MAP: Convert from DPO to input feature.
[vpp.git] / test / test_nat.py
index 81aae4b..c64359a 100644 (file)
@@ -1015,6 +1015,10 @@ class MethodHolder(VppTestCase):
         """ Verify that there is no NAT44 user """
         users = self.vapi.nat44_user_dump()
         self.assertEqual(len(users), 0)
+        users = self.statistics.get_counter('/nat44/total-users')
+        self.assertEqual(users[0][0], 0)
+        sessions = self.statistics.get_counter('/nat44/total-sessions')
+        self.assertEqual(sessions[0][0], 0)
 
     def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
         """
@@ -1566,6 +1570,11 @@ class TestNAT44(MethodHolder):
             '/err/nat44-out2in/good out2in packets processed')
         self.assertEqual(err - totaln, 3)
 
+        users = self.statistics.get_counter('/nat44/total-users')
+        self.assertEqual(users[0][0], 1)
+        sessions = self.statistics.get_counter('/nat44/total-sessions')
+        self.assertEqual(sessions[0][0], 3)
+
     def test_dynamic_icmp_errors_in2out_ttl_1(self):
         """ NAT44 handling of client packets with TTL=1 """
 
@@ -4105,6 +4114,11 @@ class TestNAT44EndpointDependent(MethodHolder):
             '/err/nat44-ed-out2in/good out2in packets processed')
         self.assertEqual(err - totaln, 2)
 
+        users = self.statistics.get_counter('/nat44/total-users')
+        self.assertEqual(users[0][0], 1)
+        sessions = self.statistics.get_counter('/nat44/total-sessions')
+        self.assertEqual(sessions[0][0], 3)
+
     def test_forwarding(self):
         """ NAT44 forwarding test """
 
@@ -6092,8 +6106,7 @@ class TestNAT44Out2InDPO(MethodHolder):
         self.src_ip6_pfx_len = 96
         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
-                                 '\x00\x00\x00\x00', 0, is_translation=1,
-                                 is_rfc6052=1)
+                                 '\x00\x00\x00\x00', 0)
 
     @unittest.skip('Temporary disabled')
     def test_464xlat_ce(self):
@@ -6936,6 +6949,8 @@ class TestNAT64(MethodHolder):
                 self.assertEqual(bibe.i_port, in_port)
                 self.assertEqual(bibe.o_port, out_port)
         self.assertEqual(static_bib_num, 1)
+        bibs = self.statistics.get_counter('/nat64/total-bibs')
+        self.assertEqual(bibs[0][0], 1)
 
         self.vapi.nat64_add_del_static_bib(in_addr,
                                            out_addr,
@@ -6949,6 +6964,8 @@ class TestNAT64(MethodHolder):
             if bibe.is_static:
                 static_bib_num += 1
         self.assertEqual(static_bib_num, 0)
+        bibs = self.statistics.get_counter('/nat64/total-bibs')
+        self.assertEqual(bibs[0][0], 0)
 
     def test_set_timeouts(self):
         """ Set NAT64 timeouts """
@@ -7031,6 +7048,11 @@ class TestNAT64(MethodHolder):
             '/err/nat64-out2in/good out2in packets processed')
         self.assertEqual(err - totaln, 3)
 
+        bibs = self.statistics.get_counter('/nat64/total-bibs')
+        self.assertEqual(bibs[0][0], 3)
+        sessions = self.statistics.get_counter('/nat64/total-sessions')
+        self.assertEqual(sessions[0][0], 3)
+
         # in2out
         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
@@ -8094,6 +8116,11 @@ class TestNAT64(MethodHolder):
                                            vrf_id=prefix.vrf_id,
                                            is_add=0)
 
+        bibs = self.statistics.get_counter('/nat64/total-bibs')
+        self.assertEqual(bibs[0][0], 0)
+        sessions = self.statistics.get_counter('/nat64/total-sessions')
+        self.assertEqual(sessions[0][0], 0)
+
     def tearDown(self):
         super(TestNAT64, self).tearDown()
         if not self.vpp_dead:
@@ -8286,6 +8313,11 @@ class TestDSlite(MethodHolder):
         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
 
+        b4s = self.statistics.get_counter('/dslite/total-b4s')
+        self.assertEqual(b4s[0][0], 2)
+        sessions = self.statistics.get_counter('/dslite/total-sessions')
+        self.assertEqual(sessions[0][0], 3)
+
     def tearDown(self):
         super(TestDSlite, self).tearDown()
         if not self.vpp_dead: