762e0709f136c020ec8952931ed2b04afb373c5a
[vpp.git] / test / test_vxlan_gpe.py
1 #!/usr/bin/env python
2
3 import socket
4 from util import ip4n_range
5 import unittest
6 from framework import VppTestCase, VppTestRunner, running_extended_tests
7 from template_bd import BridgeDomain
8
9 from scapy.layers.l2 import Ether, Raw
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.vxlan import VXLAN
12 from scapy.utils import atol
13
14
15 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
16 class TestVxlanGpe(BridgeDomain, VppTestCase):
17     """ VXLAN-GPE Test Case """
18
19     def __init__(self, *args):
20         BridgeDomain.__init__(self)
21         VppTestCase.__init__(self, *args)
22
23     def encapsulate(self, pkt, vni):
24         """
25         Encapsulate the original payload frame by adding VXLAN-GPE header
26         with its UDP, IP and Ethernet fields
27         """
28         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
29                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
30                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
31                 VXLAN(vni=vni, flags=self.flags) /
32                 pkt)
33
34     def encap_mcast(self, pkt, src_ip, src_mac, vni):
35         """
36         Encapsulate the original payload frame by adding VXLAN-GPE header
37         with its UDP, IP and Ethernet fields
38         """
39         return (Ether(src=src_mac, dst=self.mcast_mac) /
40                 IP(src=src_ip, dst=self.mcast_ip4) /
41                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
42                 VXLAN(vni=vni, flags=self.flags) /
43                 pkt)
44
45     def decapsulate(self, pkt):
46         """
47         Decapsulate the original payload frame by removing VXLAN-GPE header
48         """
49         # check if is set I and P flag
50         self.assertEqual(pkt[VXLAN].flags, int('0xc', 16))
51         return pkt[VXLAN].payload
52
53     # Method for checking VXLAN-GPE encapsulation.
54     #
55     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
56         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
57         #  by VPP using ARP.
58         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
59         if not local_only:
60             if not mcast_pkt:
61                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
62             else:
63                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
64         # Verify VXLAN-GPE tunnel src IP is VPP_IP and dst IP is MY_IP.
65         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
66         if not local_only:
67             if not mcast_pkt:
68                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
69             else:
70                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
71         # Verify UDP destination port is VXLAN-GPE 4790, source UDP port
72         #  could be arbitrary.
73         self.assertEqual(pkt[UDP].dport, type(self).dport)
74         # Verify VNI
75         self.assertEqual(pkt[VXLAN].vni, vni)
76
77     def test_decap(self):
78         """ Decapsulation test
79         Send encapsulated frames from pg0
80         Verify receipt of decapsulated frames on pg1
81         """
82
83         encapsulated_pkt = self.encapsulate(self.frame_request,
84                                             self.single_tunnel_bd)
85
86         self.pg0.add_stream([encapsulated_pkt, ])
87
88         self.pg1.enable_capture()
89
90         self.pg_start()
91
92         # Pick first received frame and check if it's the non-encapsulated
93         # frame
94         out = self.pg1.get_capture(1)
95         pkt = out[0]
96         self.assert_eq_pkts(pkt, self.frame_request)
97
98     def test_encap(self):
99         """ Encapsulation test
100         Send frames from pg1
101         Verify receipt of encapsulated frames on pg0
102         """
103         self.pg1.add_stream([self.frame_reply])
104
105         self.pg0.enable_capture()
106
107         self.pg_start()
108
109         # Pick first received frame and check if it's corectly encapsulated.
110         out = self.pg0.get_capture(1)
111         pkt = out[0]
112         self.check_encapsulation(pkt, self.single_tunnel_bd)
113
114         # payload = self.decapsulate(pkt)
115         # self.assert_eq_pkts(payload, self.frame_reply)
116
117     def test_ucast_flood(self):
118         """ Unicast flood test
119         Send frames from pg3
120         Verify receipt of encapsulated frames on pg0
121         """
122         self.pg3.add_stream([self.frame_reply])
123
124         self.pg0.enable_capture()
125
126         self.pg_start()
127
128         # Get packet from each tunnel and assert it's corectly encapsulated.
129         out = self.pg0.get_capture(self.n_ucast_tunnels)
130         for pkt in out:
131             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
132             # payload = self.decapsulate(pkt)
133             # self.assert_eq_pkts(payload, self.frame_reply)
134
135     def test_mcast_flood(self):
136         """ Multicast flood test
137         Send frames from pg2
138         Verify receipt of encapsulated frames on pg0
139         """
140         self.pg2.add_stream([self.frame_reply])
141
142         self.pg0.enable_capture()
143
144         self.pg_start()
145
146         # Pick first received frame and check if it's corectly encapsulated.
147         out = self.pg0.get_capture(1)
148         pkt = out[0]
149         self.check_encapsulation(pkt, self.mcast_flood_bd,
150                                  local_only=False, mcast_pkt=True)
151
152         # payload = self.decapsulate(pkt)
153         # self.assert_eq_pkts(payload, self.frame_reply)
154
155     @classmethod
156     def create_vxlan_gpe_flood_test_bd(cls, vni, n_ucast_tunnels):
157         # Create 10 ucast vxlan_gpe tunnels under bd
158         ip_range_start = 10
159         ip_range_end = ip_range_start + n_ucast_tunnels
160         next_hop_address = cls.pg0.remote_ip4n
161         for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
162                                     ip_range_end):
163             # add host route so dest_ip4n will not be resolved
164             cls.vapi.ip_add_del_route(dest_ip4n, 32, next_hop_address)
165             r = cls.vapi.vxlan_gpe_add_del_tunnel(
166                 src_addr=cls.pg0.local_ip4n,
167                 dst_addr=dest_ip4n,
168                 vni=vni)
169             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
170
171     @classmethod
172     def add_del_shared_mcast_dst_load(cls, is_add):
173         """
174         add or del tunnels sharing the same mcast dst
175         to test vxlan_gpe ref_count mechanism
176         """
177         n_shared_dst_tunnels = 20
178         vni_start = 1000
179         vni_end = vni_start + n_shared_dst_tunnels
180         for vni in range(vni_start, vni_end):
181             r = cls.vapi.vxlan_gpe_add_del_tunnel(
182                 src_addr=cls.pg0.local_ip4n,
183                 dst_addr=cls.mcast_ip4n,
184                 mcast_sw_if_index=1,
185                 vni=vni,
186                 is_add=is_add)
187             if r.sw_if_index == 0xffffffff:
188                 raise "bad sw_if_index"
189
190     @classmethod
191     def add_shared_mcast_dst_load(cls):
192         cls.add_del_shared_mcast_dst_load(is_add=1)
193
194     @classmethod
195     def del_shared_mcast_dst_load(cls):
196         cls.add_del_shared_mcast_dst_load(is_add=0)
197
198     @classmethod
199     def add_del_mcast_tunnels_load(cls, is_add):
200         """
201         add or del tunnels to test vxlan_gpe stability
202         """
203         n_distinct_dst_tunnels = 20
204         ip_range_start = 10
205         ip_range_end = ip_range_start + n_distinct_dst_tunnels
206         for dest_ip4n in ip4n_range(cls.mcast_ip4n, ip_range_start,
207                                     ip_range_end):
208             vni = bytearray(dest_ip4n)[3]
209             cls.vapi.vxlan_gpe_add_del_tunnel(
210                 src_addr=cls.pg0.local_ip4n,
211                 dst_addr=dest_ip4n,
212                 mcast_sw_if_index=1,
213                 vni=vni,
214                 is_add=is_add)
215
216     @classmethod
217     def add_mcast_tunnels_load(cls):
218         cls.add_del_mcast_tunnels_load(is_add=1)
219
220     @classmethod
221     def del_mcast_tunnels_load(cls):
222         cls.add_del_mcast_tunnels_load(is_add=0)
223
224     # Class method to start the VXLAN-GPE test case.
225     #  Overrides setUpClass method in VppTestCase class.
226     #  Python try..except statement is used to ensure that the tear down of
227     #  the class will be executed even if exception is raised.
228     #  @param cls The class pointer.
229     @classmethod
230     def setUpClass(cls):
231         super(TestVxlanGpe, cls).setUpClass()
232
233         try:
234             cls.dport = 4790
235             cls.flags = 0xc
236
237             # Create 2 pg interfaces.
238             cls.create_pg_interfaces(range(4))
239             for pg in cls.pg_interfaces:
240                 pg.admin_up()
241
242             # Configure IPv4 addresses on VPP pg0.
243             cls.pg0.config_ip4()
244
245             # Resolve MAC address for VPP's IP address on pg0.
246             cls.pg0.resolve_arp()
247
248             # Our Multicast address
249             cls.mcast_ip4 = '239.1.1.1'
250             cls.mcast_ip4n = socket.inet_pton(socket.AF_INET, cls.mcast_ip4)
251             iplong = atol(cls.mcast_ip4)
252             cls.mcast_mac = "01:00:5e:%02x:%02x:%02x" % (
253                 (iplong >> 16) & 0x7F, (iplong >> 8) & 0xFF, iplong & 0xFF)
254
255             # Create VXLAN-GPE VTEP on VPP pg0, and put vxlan_gpe_tunnel0
256             # and pg1 into BD.
257             cls.single_tunnel_bd = 11
258             r = cls.vapi.vxlan_gpe_add_del_tunnel(
259                 src_addr=cls.pg0.local_ip4n,
260                 dst_addr=cls.pg0.remote_ip4n,
261                 vni=cls.single_tunnel_bd)
262             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
263                                                 bd_id=cls.single_tunnel_bd)
264             cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
265                                                 bd_id=cls.single_tunnel_bd)
266
267             # Setup vni 2 to test multicast flooding
268             cls.n_ucast_tunnels = 10
269             cls.mcast_flood_bd = 12
270             cls.create_vxlan_gpe_flood_test_bd(cls.mcast_flood_bd,
271                                                cls.n_ucast_tunnels)
272             r = cls.vapi.vxlan_gpe_add_del_tunnel(
273                 src_addr=cls.pg0.local_ip4n,
274                 dst_addr=cls.mcast_ip4n,
275                 mcast_sw_if_index=1,
276                 vni=cls.mcast_flood_bd)
277             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
278                                                 bd_id=cls.mcast_flood_bd)
279             cls.vapi.sw_interface_set_l2_bridge(cls.pg2.sw_if_index,
280                                                 bd_id=cls.mcast_flood_bd)
281
282             # Add and delete mcast tunnels to check stability
283             cls.add_shared_mcast_dst_load()
284             cls.add_mcast_tunnels_load()
285             cls.del_shared_mcast_dst_load()
286             cls.del_mcast_tunnels_load()
287
288             # Setup vni 3 to test unicast flooding
289             cls.ucast_flood_bd = 13
290             cls.create_vxlan_gpe_flood_test_bd(cls.ucast_flood_bd,
291                                                cls.n_ucast_tunnels)
292             cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
293                                                 bd_id=cls.ucast_flood_bd)
294         except Exception:
295             super(TestVxlanGpe, cls).tearDownClass()
296             raise
297
298     # Method to define VPP actions before tear down of the test case.
299     #  Overrides tearDown method in VppTestCase class.
300     #  @param self The object pointer.
301     def tearDown(self):
302         super(TestVxlanGpe, self).tearDown()
303         if not self.vpp_dead:
304             self.logger.info(self.vapi.cli("show bridge-domain 11 detail"))
305             self.logger.info(self.vapi.cli("show bridge-domain 12 detail"))
306             self.logger.info(self.vapi.cli("show bridge-domain 13 detail"))
307             self.logger.info(self.vapi.cli("show int"))
308             self.logger.info(self.vapi.cli("show vxlan-gpe tunnel"))
309             self.logger.info(self.vapi.cli("show trace"))
310
311
312 if __name__ == '__main__':
313     unittest.main(testRunner=VppTestRunner)