MAP: Convert from DPO to input feature.
[vpp.git] / test / test_container.py
1 #!/usr/bin/env python
2 """ Container integration tests """
3
4 import unittest
5 from framework import VppTestCase, VppTestRunner, running_extended_tests
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from scapy.layers.inet import IP, UDP, TCP
9 from scapy.packet import Packet
10 from socket import inet_pton, AF_INET, AF_INET6
11 from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
12 from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
13 from scapy.layers.inet6 import IPv6ExtHdrFragment
14 from pprint import pprint
15 from random import randint
16 from util import L4_Conn
17
18
19 class Conn(L4_Conn):
20     # for now same as L4_Conn
21     pass
22
23
24 @unittest.skipUnless(running_extended_tests, "part of extended tests")
25 class ContainerIntegrationTestCase(VppTestCase):
26     """ Container integration extended testcases """
27
28     @classmethod
29     def setUpClass(cls):
30         super(ContainerIntegrationTestCase, cls).setUpClass()
31         # create pg0 and pg1
32         cls.create_pg_interfaces(range(2))
33         for i in cls.pg_interfaces:
34             i.admin_up()
35             i.config_ip4()
36             i.config_ip6()
37             i.resolve_arp()
38             i.resolve_ndp()
39
40     def tearDown(self):
41         """Run standard test teardown and log various show commands
42         """
43         super(ContainerIntegrationTestCase, self).tearDown()
44         if not self.vpp_dead:
45             self.logger.info(self.vapi.cli("show ip arp"))
46             self.logger.info(self.vapi.cli("show ip6 neighbors"))
47
48     def run_basic_conn_test(self, af, acl_side):
49         """ Basic connectivity test """
50         conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
51         conn1.send_through(0)
52         # the return packets should pass
53         conn1.send_through(1)
54
55     def run_negative_conn_test(self, af, acl_side):
56         """ Packets with local spoofed address """
57         conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
58         try:
59             p2 = conn1.send_through(0).command()
60         except:
61             # If we asserted while waiting, it's good.
62             # the conn should have timed out.
63             p2 = None
64         self.assert_equal(p2, None, ": packet should have been dropped")
65
66     def test_0010_basic_conn_test(self):
67         """ IPv4 basic connectivity test """
68         self.run_basic_conn_test(AF_INET, 0)
69
70     def test_0011_basic_conn_test(self):
71         """ IPv6 basic connectivity test """
72         self.run_basic_conn_test(AF_INET6, 0)
73
74     def test_0050_loopback_prepare_test(self):
75         """ Create loopbacks overlapping with remote addresses """
76         self.create_loopback_interfaces(2)
77         for i in range(2):
78             intf = self.lo_interfaces[i]
79             intf.admin_up()
80             intf._local_ip4 = self.pg_interfaces[i].remote_ip4
81             intf._local_ip4_prefix_len = 32
82             intf.config_ip4()
83             intf._local_ip6 = self.pg_interfaces[i].remote_ip6
84             intf._local_ip6_prefix_len = 128
85             intf.config_ip6()
86
87     def test_0110_basic_conn_test(self):
88         """ IPv4 local-spoof connectivity test """
89         self.run_negative_conn_test(AF_INET, 0)
90
91     def test_0111_basic_conn_test(self):
92         """ IPv6 local-spoof connectivity test """
93         self.run_negative_conn_test(AF_INET, 1)
94
95     def test_0200_basic_conn_test(self):
96         """ Configure container commands """
97         for i in range(2):
98             for addr in [self.pg_interfaces[i].remote_ip4,
99                          self.pg_interfaces[i].remote_ip6]:
100                 self.vapi.ppcli("ip container " + addr + " " +
101                                 self.pg_interfaces[i].name)
102                 self.vapi.ppcli("stn rule address " + addr +
103                                 " interface " + self.pg_interfaces[i].name)
104
105     def test_0210_basic_conn_test(self):
106         """ IPv4 test after configuring container """
107         self.run_basic_conn_test(AF_INET, 0)
108
109     def test_0211_basic_conn_test(self):
110         """ IPv6 test after configuring container """
111         self.run_basic_conn_test(AF_INET, 1)
112
113     def test_0300_unconfigure_commands(self):
114         """ Unconfigure container commands """
115         for i in range(2):
116             for addr in [self.pg_interfaces[i].remote_ip4,
117                          self.pg_interfaces[i].remote_ip6]:
118                 self.vapi.ppcli("ip container " + addr + " " +
119                                 self.pg_interfaces[i].name +
120                                 " del")
121                 self.vapi.ppcli("stn rule address " + addr +
122                                 " interface " + self.pg_interfaces[i].name +
123                                 " del")
124
125     def test_0410_spoof_test(self):
126         """ IPv4 local-spoof after unconfig test """
127         self.run_negative_conn_test(AF_INET, 0)
128
129     def test_0411_spoof_test(self):
130         """ IPv6 local-spoof after unconfig test """
131         self.run_negative_conn_test(AF_INET, 1)