tests: add enhanced packet counter verification
[vpp.git] / test / test_nat44_ed.py
index 9bb803e..764693d 100644 (file)
@@ -2,7 +2,7 @@
 
 import unittest
 from io import BytesIO
-from random import randint, shuffle, choice
+from random import randint, choice
 
 import scapy.compat
 from framework import VppTestCase, VppTestRunner
@@ -17,6 +17,7 @@ from util import ppp, ip4_range
 from vpp_acl import AclRule, VppAcl, VppAclInterface
 from vpp_ip_route import VppIpRoute, VppRoutePath
 from vpp_papi import VppEnum
+from util import StatsDiff
 
 
 class TestNAT44ED(VppTestCase):
@@ -213,6 +214,28 @@ class TestNAT44ED(VppTestCase):
         for r in rl:
             r.add_vpp_config()
 
+        cls.no_diff = StatsDiff({
+            pg.sw_if_index: {
+                '/nat44-ed/in2out/fastpath/tcp': 0,
+                '/nat44-ed/in2out/fastpath/udp': 0,
+                '/nat44-ed/in2out/fastpath/icmp': 0,
+                '/nat44-ed/in2out/fastpath/drops': 0,
+                '/nat44-ed/in2out/slowpath/tcp': 0,
+                '/nat44-ed/in2out/slowpath/udp': 0,
+                '/nat44-ed/in2out/slowpath/icmp': 0,
+                '/nat44-ed/in2out/slowpath/drops': 0,
+                '/nat44-ed/in2out/fastpath/tcp': 0,
+                '/nat44-ed/in2out/fastpath/udp': 0,
+                '/nat44-ed/in2out/fastpath/icmp': 0,
+                '/nat44-ed/in2out/fastpath/drops': 0,
+                '/nat44-ed/in2out/slowpath/tcp': 0,
+                '/nat44-ed/in2out/slowpath/udp': 0,
+                '/nat44-ed/in2out/slowpath/icmp': 0,
+                '/nat44-ed/in2out/slowpath/drops': 0,
+            }
+            for pg in cls.pg_interfaces
+        })
+
     def get_err_counter(self, path):
         return self.statistics.get_err_counter(path)
 
@@ -2622,37 +2645,41 @@ class TestNAT44EDMW(TestNAT44ED):
         self.nat_add_outside_interface(self.pg1)
 
         # in2out and no NAT addresses added
-        err_old = self.statistics.get_err_counter(
-            '/err/nat44-ed-in2out-slowpath/out of ports')
-
         pkts = self.create_stream_in(self.pg0, self.pg1)
-        self.pg0.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        self.pg1.get_capture(0, timeout=1)
 
-        err_new = self.statistics.get_err_counter(
-            '/err/nat44-ed-in2out-slowpath/out of ports')
-
-        self.assertEqual(err_new - err_old, len(pkts))
+        self.send_and_assert_no_replies(
+            self.pg0, pkts, msg="i2o pkts",
+            stats_diff=self.no_diff | {
+                "err": {
+                    '/err/nat44-ed-in2out-slowpath/out of ports': len(pkts),
+                },
+                self.pg0.sw_if_index: {
+                    '/nat44-ed/in2out/slowpath/drops': len(pkts),
+                },
+            }
+        )
 
         # in2out after NAT addresses added
         self.nat_add_address(self.nat_addr)
 
-        err_old = self.statistics.get_err_counter(
-            '/err/nat44-ed-in2out-slowpath/out of ports')
-
-        pkts = self.create_stream_in(self.pg0, self.pg1)
-        self.pg0.add_stream(pkts)
-        self.pg_enable_capture(self.pg_interfaces)
-        self.pg_start()
-        capture = self.pg1.get_capture(len(pkts))
-        self.verify_capture_out(capture, ignore_port=True)
-
-        err_new = self.statistics.get_err_counter(
-            '/err/nat44-ed-in2out-slowpath/out of ports')
-
-        self.assertEqual(err_new, err_old)
+        tcpn, udpn, icmpn = (sum(x) for x in
+                             zip(*((TCP in p, UDP in p, ICMP in p)
+                                 for p in pkts)))
+
+        self.send_and_expect(
+            self.pg0, pkts, self.pg1, msg="i2o pkts",
+            stats_diff=self.no_diff | {
+                "err": {
+                    '/err/nat44-ed-in2out-slowpath/out of ports': 0,
+                },
+                self.pg0.sw_if_index: {
+                    '/nat44-ed/in2out/slowpath/drops': 0,
+                    '/nat44-ed/in2out/slowpath/tcp': tcpn,
+                    '/nat44-ed/in2out/slowpath/udp': udpn,
+                    '/nat44-ed/in2out/slowpath/icmp': icmpn,
+                },
+            }
+        )
 
     def test_unknown_proto(self):
         """ NAT44ED translate packet with unknown protocol """