vcl: allow more rx events on peek
[vpp.git] / test / test_vxlan6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from framework import VppTestCase
5 from asfframework import VppTestRunner
6 from template_bd import BridgeDomain
7
8 from scapy.layers.l2 import Ether
9 from scapy.packet import Raw, bind_layers
10 from scapy.layers.inet6 import IP, IPv6, UDP
11 from scapy.layers.vxlan import VXLAN
12
13 import util
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_vxlan_tunnel import VppVxlanTunnel
16 from vpp_ip import INVALID_INDEX
17
18
19 class TestVxlan6(BridgeDomain, VppTestCase):
20     """VXLAN over IPv6 Test Case"""
21
22     def __init__(self, *args):
23         BridgeDomain.__init__(self)
24         VppTestCase.__init__(self, *args)
25
26     def encapsulate(self, pkt, vni):
27         """
28         Encapsulate the original payload frame by adding VXLAN header with its
29         UDP, IP and Ethernet fields
30         """
31         return (
32             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
33             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
34             / UDP(sport=self.dport, dport=self.dport, chksum=0)
35             / VXLAN(vni=vni, flags=self.flags)
36             / pkt
37         )
38
39     @classmethod
40     def ip_range(cls, s, e):
41         """range of remote ip's"""
42         tmp = cls.pg0.remote_ip6.rsplit(":", 1)[0]
43         return ("%s:%x" % (tmp, i) for i in range(s, e))
44
45     def encap_mcast(self, pkt, src_ip, src_mac, vni):
46         """
47         Encapsulate the original payload frame by adding VXLAN header with its
48         UDP, IP and Ethernet fields
49         """
50         return (
51             Ether(src=src_mac, dst=self.mcast_mac)
52             / IPv6(src=src_ip, dst=self.mcast_ip6)
53             / UDP(sport=self.dport, dport=self.dport, chksum=0)
54             / VXLAN(vni=vni, flags=self.flags)
55             / pkt
56         )
57
58     def decapsulate(self, pkt):
59         """
60         Decapsulate the original payload frame by removing VXLAN header
61         """
62         # check if is set I flag
63         self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
64         return pkt[VXLAN].payload
65
66     # Method for checking VXLAN encapsulation.
67     #
68     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
69         # TODO: add error messages
70         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
71         #  by VPP using ARP.
72         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
73         if not local_only:
74             if not mcast_pkt:
75                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
76             else:
77                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
78         # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
79         self.assertEqual(pkt[IPv6].src, self.pg0.local_ip6)
80         if not local_only:
81             if not mcast_pkt:
82                 self.assertEqual(pkt[IPv6].dst, self.pg0.remote_ip6)
83             else:
84                 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
85         # Verify UDP destination port is VXLAN 4789, source UDP port could be
86         #  arbitrary.
87         self.assertEqual(pkt[UDP].dport, self.dport)
88         # Verify UDP checksum
89         self.assert_udp_checksum_valid(pkt, ignore_zero_checksum=False)
90         # Verify VNI
91         self.assertEqual(pkt[VXLAN].vni, vni)
92
93     @classmethod
94     def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
95         # Create 10 ucast vxlan tunnels under bd
96         start = 10
97         end = start + n_ucast_tunnels
98         for dest_ip6 in cls.ip_range(start, end):
99             # add host route so dest ip will not be resolved
100             rip = VppIpRoute(
101                 cls,
102                 dest_ip6,
103                 128,
104                 [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
105                 register=False,
106             )
107             rip.add_vpp_config()
108             r = VppVxlanTunnel(
109                 cls,
110                 src=cls.pg0.local_ip6,
111                 src_port=port,
112                 dst_port=port,
113                 dst=dest_ip6,
114                 vni=vni,
115             )
116             r.add_vpp_config()
117             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
118
119     @classmethod
120     def add_mcast_tunnels_load(cls):
121         cls.add_del_mcast_tunnels_load(is_add=1)
122
123     @classmethod
124     def del_mcast_tunnels_load(cls):
125         cls.add_del_mcast_tunnels_load(is_add=0)
126
127     # Class method to start the VXLAN test case.
128     #  Overrides setUpClass method in VppTestCase class.
129     #  Python try..except statement is used to ensure that the tear down of
130     #  the class will be executed even if exception is raised.
131     #  @param cls The class pointer.
132     @classmethod
133     def setUpClass(cls):
134         super(TestVxlan6, cls).setUpClass()
135
136         try:
137             cls.flags = 0x8
138
139             # Create 2 pg interfaces.
140             cls.create_pg_interfaces(range(4))
141             for pg in cls.pg_interfaces:
142                 pg.admin_up()
143
144             # Configure IPv6 addresses on VPP pg0.
145             cls.pg0.config_ip6()
146
147             # Resolve MAC address for VPP's IP address on pg0.
148             cls.pg0.resolve_ndp()
149
150             # Our Multicast address
151             cls.mcast_ip6 = "ff0e::1"
152             cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
153         except Exception:
154             super(TestVxlan6, cls).tearDownClass()
155             raise
156
157     @classmethod
158     def tearDownClass(cls):
159         super(TestVxlan6, cls).tearDownClass()
160
161     def setUp(self):
162         super(TestVxlan6, self).setUp()
163
164     def createVxLANInterfaces(self, port=4789):
165         # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
166         #  into BD.
167         self.dport = port
168
169         self.single_tunnel_vni = 0x12345
170         self.single_tunnel_bd = 1
171         r = VppVxlanTunnel(
172             self,
173             src=self.pg0.local_ip6,
174             dst=self.pg0.remote_ip6,
175             src_port=self.dport,
176             dst_port=self.dport,
177             vni=self.single_tunnel_vni,
178         )
179         r.add_vpp_config()
180         self.vapi.sw_interface_set_l2_bridge(
181             rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
182         )
183         self.vapi.sw_interface_set_l2_bridge(
184             rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
185         )
186
187         # Setup vni 2 to test multicast flooding
188         self.n_ucast_tunnels = 10
189         self.mcast_flood_bd = 2
190         self.create_vxlan_flood_test_bd(
191             self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
192         )
193         r = VppVxlanTunnel(
194             self,
195             src=self.pg0.local_ip6,
196             dst=self.mcast_ip6,
197             src_port=self.dport,
198             dst_port=self.dport,
199             mcast_sw_if_index=1,
200             vni=self.mcast_flood_bd,
201         )
202         r.add_vpp_config()
203         self.vapi.sw_interface_set_l2_bridge(
204             rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
205         )
206         self.vapi.sw_interface_set_l2_bridge(
207             rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
208         )
209
210         # Setup vni 3 to test unicast flooding
211         self.ucast_flood_bd = 3
212         self.create_vxlan_flood_test_bd(
213             self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
214         )
215         self.vapi.sw_interface_set_l2_bridge(
216             rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
217         )
218
219         # Set scapy listen custom port for VxLAN
220         bind_layers(UDP, VXLAN, dport=self.dport)
221
222     # Method to define VPP actions before tear down of the test case.
223     #  Overrides tearDown method in VppTestCase class.
224     #  @param self The object pointer.
225     def tearDown(self):
226         super(TestVxlan6, self).tearDown()
227
228     def show_commands_at_teardown(self):
229         self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
230         self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
231         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
232         self.logger.info(self.vapi.cli("show vxlan tunnel"))
233
234     def encap_fragmented_packet(self):
235         frame = (
236             Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
237             / IP(src="4.3.2.1", dst="1.2.3.4")
238             / UDP(sport=20000, dport=10000)
239             / Raw(b"\xa5" * 1000)
240         )
241
242         frags = util.fragment_rfc791(frame, 400)
243
244         self.pg1.add_stream(frags)
245
246         self.pg0.enable_capture()
247
248         self.pg_start()
249
250         out = self.pg0.get_capture(3)
251
252         payload = []
253         for pkt in out:
254             payload.append(self.decapsulate(pkt))
255             self.check_encapsulation(pkt, self.single_tunnel_vni)
256
257         reassembled = util.reassemble4(payload)
258
259         self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
260
261     """
262     Tests with default port (4789)
263     """
264
265     def test_decap(self):
266         """Decapsulation test
267         from BridgeDoman
268         """
269         self.createVxLANInterfaces()
270         super(TestVxlan6, self).test_decap()
271
272     def test_encap(self):
273         """Encapsulation test
274         from BridgeDoman
275         """
276         self.createVxLANInterfaces()
277         super(TestVxlan6, self).test_encap()
278
279     def test_encap_fragmented_packet(self):
280         """Encapsulation test send fragments from pg1
281         Verify receipt of encapsulated frames on pg0
282         """
283         self.createVxLANInterfaces()
284         self.encap_fragmented_packet()
285
286     def test_ucast_flood(self):
287         """Unicast flood test
288         from BridgeDoman
289         """
290         self.createVxLANInterfaces()
291         super(TestVxlan6, self).test_ucast_flood()
292
293     def test_mcast_flood(self):
294         """Multicast flood test
295         from BridgeDoman
296         """
297         self.createVxLANInterfaces()
298         super(TestVxlan6, self).test_mcast_flood()
299
300     def test_mcast_rcv(self):
301         """Multicast receive test
302         from BridgeDoman
303         """
304         self.createVxLANInterfaces()
305         super(TestVxlan6, self).test_mcast_rcv()
306
307     """
308     Tests with custom port
309     """
310
311     def test_decap_custom_port(self):
312         """Decapsulation test custom port
313         from BridgeDoman
314         """
315         self.createVxLANInterfaces(1111)
316         super(TestVxlan6, self).test_decap()
317
318     def test_encap_custom_port(self):
319         """Encapsulation test custom port
320         from BridgeDoman
321         """
322         self.createVxLANInterfaces(1111)
323         super(TestVxlan6, self).test_encap()
324
325     def test_ucast_flood_custom_port(self):
326         """Unicast flood test custom port
327         from BridgeDoman
328         """
329         self.createVxLANInterfaces(1111)
330         super(TestVxlan6, self).test_ucast_flood()
331
332     def test_mcast_flood_custom_port(self):
333         """Multicast flood test custom port
334         from BridgeDoman
335         """
336         self.createVxLANInterfaces(1111)
337         super(TestVxlan6, self).test_mcast_flood()
338
339     def test_mcast_rcv_custom_port(self):
340         """Multicast receive test custom port
341         from BridgeDoman
342         """
343         self.createVxLANInterfaces(1111)
344         super(TestVxlan6, self).test_mcast_rcv()
345
346
347 if __name__ == "__main__":
348     unittest.main(testRunner=VppTestRunner)