1 from config import config
2 from framework import VppTestCase
3 from asfframework import VppTestRunner
5 from scapy.layers.l2 import Ether
6 from scapy.layers.inet import UDP
7 from scapy.layers.inet6 import IPv6
8 from random import randint
11 # TODO: get an actual output from "show ip6 connection-tracker"
12 @unittest.skipIf("ct6" in config.excluded_plugins, "Exclude CT6 plugin tests")
13 class TestCt6(VppTestCase):
14 """CT6 plugin tests"""
18 super(TestCt6, cls).setUpClass()
20 cls.create_pg_interfaces(range(2))
21 for i in cls.pg_interfaces:
22 i.config_ip6().resolve_ndp()
29 def tearDownClass(cls):
30 super(TestCt6, cls).tearDownClass()
32 def create_stream(self, src_if, dst_if, count):
34 for i in range(count):
36 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
37 / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
38 / UDP(sport=randint(49152, 65535), dport=5678, chksum=0)
43 def test_ct6_vapi(self):
44 self.vapi.ct6_enable_disable(enable_disable=True, is_inside=True, sw_if_index=1)
45 self.vapi.ct6_enable_disable(
46 enable_disable=True, is_inside=False, sw_if_index=1
49 packets = self.create_stream(self.pg0, self.pg1, 5)
50 self.pg0.add_stream(packets)
53 self.vapi.ct6_enable_disable(1234, 567, True, True, "pg0", "disable")
55 def test_ct6_cli(self):
56 self.vapi.cli("set ct6 outside pg1")
57 self.vapi.cli("set ct6 inside pg1")
59 packets = self.create_stream(self.pg0, self.pg1, 5)
60 self.pg0.add_stream(packets)
63 reply = self.vapi.cli("show ip6 connection-tracker")
64 self.assertIn("Thread 0", reply)
65 reply = self.vapi.cli("test ip6 connection-tracker")
66 self.assertIn("End state", reply)
68 self.vapi.cli("set ct6 inside pg1 disable")
71 if __name__ == "__main__":
72 unittest.main(testRunner=VppTestRunner)