+ def test_vlib_main_unittest(self):
+ """Private Binary API Segment Test (takes 70 seconds)"""
+
+ vat_path = config.vpp + "_api_test"
+ vat = pexpect.spawn(vat_path, ["socket-name", self.get_api_sock_path()])
+ vat.expect("vat# ", timeout=10)
+ vat.sendline("sock_init_shm")
+ vat.expect("vat# ", timeout=10)
+ vat.sendline("sh api cli")
+ vat.kill(signal.SIGKILL)
+ vat.wait()
+ self.logger.info("vat terminated, 70 second wait for the Reaper")
+ time.sleep(70)
+ self.logger.info("Reaper should be complete...")
+
+ def test_pool(self):
+ """Fixed-size Pool Test"""
+
+ cmds = [
+ "test pool",
+ ]
+
+ for cmd in cmds:
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, "reply"):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+
+
+class TestVlibFrameLeak(VppTestCase):
+ """Vlib Frame Leak Test Cases"""
+
+ vpp_worker_count = 1
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestVlibFrameLeak, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestVlibFrameLeak, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestVlibFrameLeak, self).setUp()
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ def tearDown(self):
+ super(TestVlibFrameLeak, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def test_vlib_mw_refork_frame_leak(self):
+ """Vlib worker thread refork leak test case"""
+ icmp_id = 0xB
+ icmp_seq = 5
+ icmp_load = b"\x0a" * 18
+ pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / ICMP(id=icmp_id, seq=icmp_seq)
+ / Raw(load=icmp_load)
+ )
+
+ # Send a packet
+ self.pg0.add_stream(pkt)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+
+ self.assertEquals(len(rx), 1)
+ rx = rx[0]
+ ether = rx[Ether]
+ ipv4 = rx[IP]
+
+ self.assertEqual(ether.src, self.pg0.local_mac)
+ self.assertEqual(ether.dst, self.pg0.remote_mac)
+
+ self.assertEqual(ipv4.src, self.pg0.local_ip4)
+ self.assertEqual(ipv4.dst, self.pg0.remote_ip4)
+
+ # Save allocated frame count
+ frame_allocated = {}
+ for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
+ spl = fs.split()
+ thread = int(spl[0])
+ size = int(spl[1])
+ alloc = int(spl[2])
+ key = (thread, size)
+ frame_allocated[key] = alloc
+
+ # cause reforks
+ _ = self.create_loopback_interfaces(1)
+
+ # send the same packet
+ self.pg0.add_stream(pkt)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+
+ self.assertEquals(len(rx), 1)
+ rx = rx[0]
+ ether = rx[Ether]
+ ipv4 = rx[IP]
+
+ self.assertEqual(ether.src, self.pg0.local_mac)
+ self.assertEqual(ether.dst, self.pg0.remote_mac)
+
+ self.assertEqual(ipv4.src, self.pg0.local_ip4)
+ self.assertEqual(ipv4.dst, self.pg0.remote_ip4)
+
+ # Check that no frame were leaked during refork
+ for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
+ spl = fs.split()
+ thread = int(spl[0])
+ size = int(spl[1])
+ alloc = int(spl[2])
+ key = (thread, size)
+ self.assertEqual(frame_allocated[key], alloc)
+
+
+if __name__ == "__main__":