misc: binary api fuzz test fixes
[vpp.git] / test / test_vxlan6.py
1 #!/usr/bin/env python3
2
3 import socket
4 import unittest
5 from framework import VppTestCase, VppTestRunner
6 from template_bd import BridgeDomain
7
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet6 import IPv6, UDP
10 from scapy.layers.vxlan import VXLAN
11
12 import util
13 from vpp_ip_route import VppIpRoute, VppRoutePath
14 from vpp_vxlan_tunnel import VppVxlanTunnel
15 from vpp_ip import INVALID_INDEX
16
17
18 class TestVxlan6(BridgeDomain, VppTestCase):
19     """ VXLAN over IPv6 Test Case """
20
21     def __init__(self, *args):
22         BridgeDomain.__init__(self)
23         VppTestCase.__init__(self, *args)
24
25     def encapsulate(self, pkt, vni):
26         """
27         Encapsulate the original payload frame by adding VXLAN header with its
28         UDP, IP and Ethernet fields
29         """
30         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
31                 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
32                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
33                 VXLAN(vni=vni, flags=self.flags) /
34                 pkt)
35
36     @classmethod
37     def ip_range(cls, s, e):
38         """ range of remote ip's """
39         tmp = cls.pg0.remote_ip6.rsplit(':', 1)[0]
40         return ("%s:%x" % (tmp, i) for i in range(s, e))
41
42     def encap_mcast(self, pkt, src_ip, src_mac, vni):
43         """
44         Encapsulate the original payload frame by adding VXLAN header with its
45         UDP, IP and Ethernet fields
46         """
47         return (Ether(src=src_mac, dst=self.mcast_mac) /
48                 IPv6(src=src_ip, dst=self.mcast_ip6) /
49                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
50                 VXLAN(vni=vni, flags=self.flags) /
51                 pkt)
52
53     def decapsulate(self, pkt):
54         """
55         Decapsulate the original payload frame by removing VXLAN header
56         """
57         # check if is set I flag
58         self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
59         return pkt[VXLAN].payload
60
61     # Method for checking VXLAN encapsulation.
62     #
63     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
64         # TODO: add error messages
65         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
66         #  by VPP using ARP.
67         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
68         if not local_only:
69             if not mcast_pkt:
70                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
71             else:
72                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
73         # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
74         self.assertEqual(pkt[IPv6].src, self.pg0.local_ip6)
75         if not local_only:
76             if not mcast_pkt:
77                 self.assertEqual(pkt[IPv6].dst, self.pg0.remote_ip6)
78             else:
79                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
80         # Verify UDP destination port is VXLAN 4789, source UDP port could be
81         #  arbitrary.
82         self.assertEqual(pkt[UDP].dport, type(self).dport)
83         # TODO: checksum check
84         # Verify VNI
85         self.assertEqual(pkt[VXLAN].vni, vni)
86
87     @classmethod
88     def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
89         # Create 10 ucast vxlan tunnels under bd
90         start = 10
91         end = start + n_ucast_tunnels
92         for dest_ip6 in cls.ip_range(start, end):
93             # add host route so dest ip will not be resolved
94             rip = VppIpRoute(cls, dest_ip6, 128,
95                              [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
96                              register=False)
97             rip.add_vpp_config()
98             r = VppVxlanTunnel(cls, src=cls.pg0.local_ip6,
99                                dst=dest_ip6, vni=vni)
100             r.add_vpp_config()
101             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
102
103     @classmethod
104     def add_mcast_tunnels_load(cls):
105         cls.add_del_mcast_tunnels_load(is_add=1)
106
107     @classmethod
108     def del_mcast_tunnels_load(cls):
109         cls.add_del_mcast_tunnels_load(is_add=0)
110
111     # Class method to start the VXLAN test case.
112     #  Overrides setUpClass method in VppTestCase class.
113     #  Python try..except statement is used to ensure that the tear down of
114     #  the class will be executed even if exception is raised.
115     #  @param cls The class pointer.
116     @classmethod
117     def setUpClass(cls):
118         super(TestVxlan6, cls).setUpClass()
119
120         try:
121             cls.dport = 4789
122             cls.flags = 0x8
123
124             # Create 2 pg interfaces.
125             cls.create_pg_interfaces(range(4))
126             for pg in cls.pg_interfaces:
127                 pg.admin_up()
128
129             # Configure IPv6 addresses on VPP pg0.
130             cls.pg0.config_ip6()
131
132             # Resolve MAC address for VPP's IP address on pg0.
133             cls.pg0.resolve_ndp()
134
135             # Our Multicast address
136             cls.mcast_ip6 = 'ff0e::1'
137             cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
138         except Exception:
139             super(TestVxlan6, cls).tearDownClass()
140             raise
141
142     @classmethod
143     def tearDownClass(cls):
144         super(TestVxlan6, cls).tearDownClass()
145
146     def setUp(self):
147         super(TestVxlan6, self).setUp()
148         # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
149         #  into BD.
150         self.single_tunnel_vni = 0x12345
151         self.single_tunnel_bd = 1
152         r = VppVxlanTunnel(self, src=self.pg0.local_ip6,
153                            dst=self.pg0.remote_ip6,
154                            vni=self.single_tunnel_vni)
155         r.add_vpp_config()
156         self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
157                                              bd_id=self.single_tunnel_bd)
158         self.vapi.sw_interface_set_l2_bridge(
159             rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd)
160
161         # Setup vni 2 to test multicast flooding
162         self.n_ucast_tunnels = 10
163         self.mcast_flood_bd = 2
164         self.create_vxlan_flood_test_bd(self.mcast_flood_bd,
165                                         self.n_ucast_tunnels)
166         r = VppVxlanTunnel(self, src=self.pg0.local_ip6, dst=self.mcast_ip6,
167                            mcast_sw_if_index=1, vni=self.mcast_flood_bd)
168         r.add_vpp_config()
169         self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
170                                              bd_id=self.mcast_flood_bd)
171         self.vapi.sw_interface_set_l2_bridge(
172             rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd)
173
174         # Setup vni 3 to test unicast flooding
175         self.ucast_flood_bd = 3
176         self.create_vxlan_flood_test_bd(self.ucast_flood_bd,
177                                         self.n_ucast_tunnels)
178         self.vapi.sw_interface_set_l2_bridge(
179             rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd)
180
181     # Method to define VPP actions before tear down of the test case.
182     #  Overrides tearDown method in VppTestCase class.
183     #  @param self The object pointer.
184     def tearDown(self):
185         super(TestVxlan6, self).tearDown()
186
187     def show_commands_at_teardown(self):
188         self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
189         self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
190         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
191         self.logger.info(self.vapi.cli("show vxlan tunnel"))
192
193
194 if __name__ == '__main__':
195     unittest.main(testRunner=VppTestRunner)