tests docs: update python3 venv packages
[vpp.git] / test / test_gtpu.py
1 #!/usr/bin/env python3
2
3 import socket
4 from util import ip4_range
5 import unittest
6 from framework import tag_fixme_vpp_workers
7 from framework import VppTestCase, VppTestRunner
8 from template_bd import BridgeDomain
9
10 from scapy.layers.l2 import Ether
11 from scapy.packet import Raw
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.inet6 import IPv6
14 from scapy.contrib.gtp import GTP_U_Header
15 from scapy.utils import atol
16
17 import util
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import INVALID_INDEX
20
21
22 @tag_fixme_vpp_workers
23 class TestGtpuUDP(VppTestCase):
24     """GTPU UDP ports Test Case"""
25
26     def setUp(self):
27         super(TestGtpuUDP, self).setUp()
28
29         self.dport = 2152
30
31         self.ip4_err = 0
32         self.ip6_err = 0
33
34         self.create_pg_interfaces(range(1))
35         for pg in self.pg_interfaces:
36             pg.admin_up()
37         self.pg0.config_ip4()
38         self.pg0.config_ip6()
39
40     def _check_udp_port_ip4(self, enabled=True):
41         pkt = (
42             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
43             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
44             / UDP(sport=self.dport, dport=self.dport, chksum=0)
45         )
46
47         self.pg0.add_stream(pkt)
48         self.pg_start()
49
50         err = self.statistics.get_counter("/err/ip4-udp-lookup/no_listener")[0]
51
52         if enabled:
53             self.assertEqual(err, self.ip4_err)
54         else:
55             self.assertEqual(err, self.ip4_err + 1)
56
57         self.ip4_err = err
58
59     def _check_udp_port_ip6(self, enabled=True):
60         pkt = (
61             Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
62             / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
63             / UDP(sport=self.dport, dport=self.dport, chksum=0)
64         )
65
66         self.pg0.add_stream(pkt)
67         self.pg_start()
68
69         err = self.statistics.get_counter("/err/ip6-udp-lookup/no_listener")[0]
70
71         if enabled:
72             self.assertEqual(err, self.ip6_err)
73         else:
74             self.assertEqual(err, self.ip6_err + 1)
75
76         self.ip6_err = err
77
78     def test_udp_port(self):
79         """test UDP ports
80         Check if there are no udp listeners before gtpu is enabled
81         """
82         # UDP ports should be disabled unless a tunnel is configured
83         self._check_udp_port_ip4(False)
84         self._check_udp_port_ip6(False)
85
86         r = self.vapi.gtpu_add_del_tunnel(
87             is_add=True,
88             mcast_sw_if_index=0xFFFFFFFF,
89             decap_next_index=0xFFFFFFFF,
90             src_address=self.pg0.local_ip4,
91             dst_address=self.pg0.remote_ip4,
92         )
93
94         # UDP port 2152 enabled for ip4
95         self._check_udp_port_ip4()
96
97         r = self.vapi.gtpu_add_del_tunnel(
98             is_add=True,
99             mcast_sw_if_index=0xFFFFFFFF,
100             decap_next_index=0xFFFFFFFF,
101             src_address=self.pg0.local_ip6,
102             dst_address=self.pg0.remote_ip6,
103         )
104
105         # UDP port 2152 enabled for ip6
106         self._check_udp_port_ip6()
107
108         r = self.vapi.gtpu_add_del_tunnel(
109             is_add=False,
110             mcast_sw_if_index=0xFFFFFFFF,
111             decap_next_index=0xFFFFFFFF,
112             src_address=self.pg0.local_ip4,
113             dst_address=self.pg0.remote_ip4,
114         )
115
116         r = self.vapi.gtpu_add_del_tunnel(
117             is_add=False,
118             mcast_sw_if_index=0xFFFFFFFF,
119             decap_next_index=0xFFFFFFFF,
120             src_address=self.pg0.local_ip6,
121             dst_address=self.pg0.remote_ip6,
122         )
123
124
125 class TestGtpu(BridgeDomain, VppTestCase):
126     """GTPU Test Case"""
127
128     def __init__(self, *args):
129         BridgeDomain.__init__(self)
130         VppTestCase.__init__(self, *args)
131
132     def encapsulate(self, pkt, vni):
133         """
134         Encapsulate the original payload frame by adding GTPU header with its
135         UDP, IP and Ethernet fields
136         """
137         return (
138             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
139             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
140             / UDP(sport=self.dport, dport=self.dport, chksum=0)
141             / GTP_U_Header(teid=vni, gtp_type=self.gtp_type, length=150)
142             / pkt
143         )
144
145     def ip_range(self, start, end):
146         """range of remote ip's"""
147         return ip4_range(self.pg0.remote_ip4, start, end)
148
149     def encap_mcast(self, pkt, src_ip, src_mac, vni):
150         """
151         Encapsulate the original payload frame by adding GTPU header with its
152         UDP, IP and Ethernet fields
153         """
154         return (
155             Ether(src=src_mac, dst=self.mcast_mac)
156             / IP(src=src_ip, dst=self.mcast_ip4)
157             / UDP(sport=self.dport, dport=self.dport, chksum=0)
158             / GTP_U_Header(teid=vni, gtp_type=self.gtp_type, length=150)
159             / pkt
160         )
161
162     def decapsulate(self, pkt):
163         """
164         Decapsulate the original payload frame by removing GTPU header
165         """
166         return pkt[GTP_U_Header].payload
167
168     # Method for checking GTPU encapsulation.
169     #
170     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
171         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
172         #  by VPP using ARP.
173         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
174         if not local_only:
175             if not mcast_pkt:
176                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
177             else:
178                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
179         # Verify GTPU tunnel source IP is VPP_IP and destination IP is MY_IP.
180         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
181         if not local_only:
182             if not mcast_pkt:
183                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
184             else:
185                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
186         # Verify UDP destination port is GTPU 2152, source UDP port could be
187         #  arbitrary.
188         self.assertEqual(pkt[UDP].dport, type(self).dport)
189         # Verify teid
190         self.assertEqual(pkt[GTP_U_Header].teid, vni)
191
192     def test_encap(self):
193         """Encapsulation test
194         Send frames from pg1
195         Verify receipt of encapsulated frames on pg0
196         """
197         self.pg1.add_stream([self.frame_reply])
198
199         self.pg0.enable_capture()
200
201         self.pg_start()
202
203         # Pick first received frame and check if it's correctly encapsulated.
204         out = self.pg0.get_capture(1)
205         pkt = out[0]
206         self.check_encapsulation(pkt, self.single_tunnel_vni)
207
208         # payload = self.decapsulate(pkt)
209         # self.assert_eq_pkts(payload, self.frame_reply)
210
211     def test_ucast_flood(self):
212         """Unicast flood test
213         Send frames from pg3
214         Verify receipt of encapsulated frames on pg0
215         """
216         self.pg3.add_stream([self.frame_reply])
217
218         self.pg0.enable_capture()
219
220         self.pg_start()
221
222         # Get packet from each tunnel and assert it's correctly encapsulated.
223         out = self.pg0.get_capture(self.n_ucast_tunnels)
224         for pkt in out:
225             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
226             # payload = self.decapsulate(pkt)
227             # self.assert_eq_pkts(payload, self.frame_reply)
228
229     def test_mcast_flood(self):
230         """Multicast flood test
231         Send frames from pg2
232         Verify receipt of encapsulated frames on pg0
233         """
234         self.pg2.add_stream([self.frame_reply])
235
236         self.pg0.enable_capture()
237
238         self.pg_start()
239
240         # Pick first received frame and check if it's correctly encapsulated.
241         out = self.pg0.get_capture(1)
242         pkt = out[0]
243         self.check_encapsulation(
244             pkt, self.mcast_flood_bd, local_only=False, mcast_pkt=True
245         )
246
247         # payload = self.decapsulate(pkt)
248         # self.assert_eq_pkts(payload, self.frame_reply)
249
250     @classmethod
251     def create_gtpu_flood_test_bd(cls, teid, n_ucast_tunnels):
252         # Create 10 ucast gtpu tunnels under bd
253         ip_range_start = 10
254         ip_range_end = ip_range_start + n_ucast_tunnels
255         next_hop_address = cls.pg0.remote_ip4
256         for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end):
257             # add host route so dest_ip4 will not be resolved
258             rip = VppIpRoute(
259                 cls,
260                 dest_ip4,
261                 32,
262                 [VppRoutePath(next_hop_address, INVALID_INDEX)],
263                 register=False,
264             )
265             rip.add_vpp_config()
266             r = cls.vapi.gtpu_add_del_tunnel(
267                 is_add=True,
268                 mcast_sw_if_index=0xFFFFFFFF,
269                 decap_next_index=0xFFFFFFFF,
270                 src_address=cls.pg0.local_ip4,
271                 dst_address=dest_ip4,
272                 teid=teid,
273             )
274             cls.vapi.sw_interface_set_l2_bridge(
275                 rx_sw_if_index=r.sw_if_index, bd_id=teid
276             )
277
278     @classmethod
279     def add_del_shared_mcast_dst_load(cls, is_add):
280         """
281         add or del tunnels sharing the same mcast dst
282         to test gtpu ref_count mechanism
283         """
284         n_shared_dst_tunnels = 20
285         teid_start = 1000
286         teid_end = teid_start + n_shared_dst_tunnels
287         for teid in range(teid_start, teid_end):
288             r = cls.vapi.gtpu_add_del_tunnel(
289                 decap_next_index=0xFFFFFFFF,
290                 src_address=cls.pg0.local_ip4,
291                 dst_address=cls.mcast_ip4,
292                 mcast_sw_if_index=1,
293                 teid=teid,
294                 is_add=is_add,
295             )
296             if r.sw_if_index == 0xFFFFFFFF:
297                 raise ValueError("bad sw_if_index: ~0")
298
299     @classmethod
300     def add_shared_mcast_dst_load(cls):
301         cls.add_del_shared_mcast_dst_load(is_add=1)
302
303     @classmethod
304     def del_shared_mcast_dst_load(cls):
305         cls.add_del_shared_mcast_dst_load(is_add=0)
306
307     @classmethod
308     def add_del_mcast_tunnels_load(cls, is_add):
309         """
310         add or del tunnels to test gtpu stability
311         """
312         n_distinct_dst_tunnels = 20
313         ip_range_start = 10
314         ip_range_end = ip_range_start + n_distinct_dst_tunnels
315         for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end):
316             teid = int(dest_ip4.split(".")[3])
317             cls.vapi.gtpu_add_del_tunnel(
318                 decap_next_index=0xFFFFFFFF,
319                 src_address=cls.pg0.local_ip4,
320                 dst_address=dest_ip4,
321                 mcast_sw_if_index=1,
322                 teid=teid,
323                 is_add=is_add,
324             )
325
326     @classmethod
327     def add_mcast_tunnels_load(cls):
328         cls.add_del_mcast_tunnels_load(is_add=1)
329
330     @classmethod
331     def del_mcast_tunnels_load(cls):
332         cls.add_del_mcast_tunnels_load(is_add=0)
333
334     # Class method to start the GTPU test case.
335     #  Overrides setUpClass method in VppTestCase class.
336     #  Python try..except statement is used to ensure that the tear down of
337     #  the class will be executed even if exception is raised.
338     #  @param cls The class pointer.
339     @classmethod
340     def setUpClass(cls):
341         super(TestGtpu, cls).setUpClass()
342
343         try:
344             cls.dport = 2152
345             cls.gtp_type = 0xFF
346
347             # Create 2 pg interfaces.
348             cls.create_pg_interfaces(range(4))
349             for pg in cls.pg_interfaces:
350                 pg.admin_up()
351
352             # Configure IPv4 addresses on VPP pg0.
353             cls.pg0.config_ip4()
354
355             # Resolve MAC address for VPP's IP address on pg0.
356             cls.pg0.resolve_arp()
357
358             # Our Multicast address
359             cls.mcast_ip4 = "239.1.1.1"
360             cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4)
361
362             # Create GTPU VTEP on VPP pg0, and put gtpu_tunnel0 and pg1
363             #  into BD.
364             cls.single_tunnel_bd = 11
365             cls.single_tunnel_vni = 11
366             r = cls.vapi.gtpu_add_del_tunnel(
367                 is_add=True,
368                 mcast_sw_if_index=0xFFFFFFFF,
369                 decap_next_index=0xFFFFFFFF,
370                 src_address=cls.pg0.local_ip4,
371                 dst_address=cls.pg0.remote_ip4,
372                 teid=cls.single_tunnel_vni,
373             )
374             cls.vapi.sw_interface_set_l2_bridge(
375                 rx_sw_if_index=r.sw_if_index, bd_id=cls.single_tunnel_bd
376             )
377             cls.vapi.sw_interface_set_l2_bridge(
378                 rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd
379             )
380
381             # Setup teid 2 to test multicast flooding
382             cls.n_ucast_tunnels = 10
383             cls.mcast_flood_bd = 12
384             cls.create_gtpu_flood_test_bd(cls.mcast_flood_bd, cls.n_ucast_tunnels)
385             r = cls.vapi.gtpu_add_del_tunnel(
386                 is_add=True,
387                 src_address=cls.pg0.local_ip4,
388                 dst_address=cls.mcast_ip4,
389                 mcast_sw_if_index=1,
390                 decap_next_index=0xFFFFFFFF,
391                 teid=cls.mcast_flood_bd,
392             )
393             cls.vapi.sw_interface_set_l2_bridge(
394                 rx_sw_if_index=r.sw_if_index, bd_id=cls.mcast_flood_bd
395             )
396             cls.vapi.sw_interface_set_l2_bridge(
397                 rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd
398             )
399
400             # Add and delete mcast tunnels to check stability
401             cls.add_shared_mcast_dst_load()
402             cls.add_mcast_tunnels_load()
403             cls.del_shared_mcast_dst_load()
404             cls.del_mcast_tunnels_load()
405
406             # Setup teid 3 to test unicast flooding
407             cls.ucast_flood_bd = 13
408             cls.create_gtpu_flood_test_bd(cls.ucast_flood_bd, cls.n_ucast_tunnels)
409             cls.vapi.sw_interface_set_l2_bridge(
410                 rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd
411             )
412         except Exception:
413             super(TestGtpu, cls).tearDownClass()
414             raise
415
416     @classmethod
417     def tearDownClass(cls):
418         super(TestGtpu, cls).tearDownClass()
419
420     # Method to define VPP actions before tear down of the test case.
421     #  Overrides tearDown method in VppTestCase class.
422     #  @param self The object pointer.
423     def tearDown(self):
424         super(TestGtpu, self).tearDown()
425
426     def show_commands_at_teardown(self):
427         self.logger.info(self.vapi.cli("show bridge-domain 11 detail"))
428         self.logger.info(self.vapi.cli("show bridge-domain 12 detail"))
429         self.logger.info(self.vapi.cli("show bridge-domain 13 detail"))
430         self.logger.info(self.vapi.cli("show int"))
431         self.logger.info(self.vapi.cli("show gtpu tunnel"))
432         self.logger.info(self.vapi.cli("show trace"))
433
434
435 if __name__ == "__main__":
436     unittest.main(testRunner=VppTestRunner)