tests: replace pycodestyle with black
[vpp.git] / test / test_geneve.py
1 #!/usr/bin/env python3
2
3 import socket
4 from util import ip4_range
5 import unittest
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
8
9 from scapy.layers.l2 import Ether, ARP
10 from scapy.layers.inet import IP, UDP, ICMP
11 from scapy.contrib.geneve import GENEVE
12
13 import util
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_ip import INVALID_INDEX
16
17
18 class TestGeneve(BridgeDomain, VppTestCase):
19     """GENEVE Test Case"""
20
21     def __init__(self, *args):
22         BridgeDomain.__init__(self)
23         VppTestCase.__init__(self, *args)
24
25     def encapsulate(self, pkt, vni):
26
27         """
28         Encapsulate the original payload frame by adding GENEVE header with its
29         UDP, IP and Ethernet fields
30         """
31         return (
32             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
33             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
34             / UDP(sport=self.dport, dport=self.dport, chksum=0)
35             / GENEVE(vni=vni)
36             / pkt
37         )
38
39     def ip_range(self, start, end):
40         """range of remote ip's"""
41         return ip4_range(self.pg0.remote_ip4, start, end)
42
43     def encap_mcast(self, pkt, src_ip, src_mac, vni):
44         """
45         Encapsulate the original payload frame by adding GENEVE header with its
46         UDP, IP and Ethernet fields
47         """
48         return (
49             Ether(src=src_mac, dst=self.mcast_mac)
50             / IP(src=src_ip, dst=self.mcast_ip4)
51             / UDP(sport=self.dport, dport=self.dport, chksum=0)
52             / GENEVE(vni=vni)
53             / pkt
54         )
55
56     def decapsulate(self, pkt):
57         """
58         Decapsulate the original payload frame by removing GENEVE header
59         """
60         # check if is set I flag
61         # self.assertEqual(pkt[GENEVE].flags, int('0x8', 16))
62         return pkt[GENEVE].payload
63
64     # Method for checking GENEVE encapsulation.
65     #
66     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
67         # TODO: add error messages
68         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
69         #  by VPP using ARP.
70         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
71         if not local_only:
72             if not mcast_pkt:
73                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
74             else:
75                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
76         # Verify GENEVE tunnel source IP is VPP_IP and destination IP is MY_IP.
77         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
78         if not local_only:
79             if not mcast_pkt:
80                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
81             else:
82                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
83         # Verify UDP destination port is GENEVE 4789, source UDP port could be
84         #  arbitrary.
85         self.assertEqual(pkt[UDP].dport, type(self).dport)
86         # TODO: checksum check
87         # Verify VNI
88         self.assertEqual(pkt[GENEVE].vni, vni)
89
90     @classmethod
91     def create_geneve_flood_test_bd(cls, vni, n_ucast_tunnels):
92         # Create 10 ucast geneve tunnels under bd
93         ip_range_start = 10
94         ip_range_end = ip_range_start + n_ucast_tunnels
95         next_hop_address = cls.pg0.remote_ip4
96         for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end):
97             # add host route so dest_ip4 will not be resolved
98             rip = VppIpRoute(
99                 cls,
100                 dest_ip4,
101                 32,
102                 [VppRoutePath(next_hop_address, INVALID_INDEX)],
103                 register=False,
104             )
105             rip.add_vpp_config()
106             r = cls.vapi.geneve_add_del_tunnel(
107                 local_address=cls.pg0.local_ip4, remote_address=dest_ip4, vni=vni
108             )
109             cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, bd_id=vni)
110
111     @classmethod
112     def add_del_shared_mcast_dst_load(cls, is_add):
113         """
114         add or del tunnels sharing the same mcast dst
115         to test geneve ref_count mechanism
116         """
117         n_shared_dst_tunnels = 10
118         vni_start = 10000
119         vni_end = vni_start + n_shared_dst_tunnels
120         for vni in range(vni_start, vni_end):
121             r = cls.vapi.geneve_add_del_tunnel(
122                 local_address=cls.pg0.local_ip4,
123                 remote_address=cls.mcast_ip4,
124                 mcast_sw_if_index=1,
125                 is_add=is_add,
126                 vni=vni,
127             )
128             if r.sw_if_index == 0xFFFFFFFF:
129                 raise ValueError("bad sw_if_index: ~0")
130
131     @classmethod
132     def add_shared_mcast_dst_load(cls):
133         cls.add_del_shared_mcast_dst_load(is_add=1)
134
135     @classmethod
136     def del_shared_mcast_dst_load(cls):
137         cls.add_del_shared_mcast_dst_load(is_add=0)
138
139     @classmethod
140     def add_del_mcast_tunnels_load(cls, is_add):
141         """
142         add or del tunnels to test geneve stability
143         """
144         n_distinct_dst_tunnels = 10
145         ip_range_start = 10
146         ip_range_end = ip_range_start + n_distinct_dst_tunnels
147         for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end):
148             vni = int(dest_ip4.split(".")[3])
149             cls.vapi.geneve_add_del_tunnel(
150                 local_address=cls.pg0.local_ip4,
151                 remote_address=dest_ip4,
152                 mcast_sw_if_index=1,
153                 is_add=is_add,
154                 vni=vni,
155             )
156
157     @classmethod
158     def add_mcast_tunnels_load(cls):
159         cls.add_del_mcast_tunnels_load(is_add=1)
160
161     @classmethod
162     def del_mcast_tunnels_load(cls):
163         cls.add_del_mcast_tunnels_load(is_add=0)
164
165     # Class method to start the GENEVE test case.
166     #  Overrides setUpClass method in VppTestCase class.
167     #  Python try..except statement is used to ensure that the tear down of
168     #  the class will be executed even if exception is raised.
169     #  @param cls The class pointer.
170     @classmethod
171     def setUpClass(cls):
172         super(TestGeneve, cls).setUpClass()
173
174         try:
175             cls.dport = 6081
176
177             # Create 2 pg interfaces.
178             cls.create_pg_interfaces(range(4))
179             for pg in cls.pg_interfaces:
180                 pg.admin_up()
181
182             # Configure IPv4 addresses on VPP pg0.
183             cls.pg0.config_ip4()
184
185             # Resolve MAC address for VPP's IP address on pg0.
186             cls.pg0.resolve_arp()
187
188             # Our Multicast address
189             cls.mcast_ip4 = "239.1.1.1"
190             cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4)
191
192             # Create GENEVE VTEP on VPP pg0, and put geneve_tunnel0 and pg1
193             #  into BD.
194             cls.single_tunnel_vni = 0xABCDE
195             cls.single_tunnel_bd = 1
196             r = cls.vapi.geneve_add_del_tunnel(
197                 local_address=cls.pg0.local_ip4,
198                 remote_address=cls.pg0.remote_ip4,
199                 vni=cls.single_tunnel_vni,
200             )
201             cls.vapi.sw_interface_set_l2_bridge(
202                 rx_sw_if_index=r.sw_if_index, bd_id=cls.single_tunnel_bd
203             )
204             cls.vapi.sw_interface_set_l2_bridge(
205                 rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd
206             )
207
208             # Setup vni 2 to test multicast flooding
209             cls.n_ucast_tunnels = 10
210             cls.mcast_flood_bd = 2
211             cls.create_geneve_flood_test_bd(cls.mcast_flood_bd, cls.n_ucast_tunnels)
212             r = cls.vapi.geneve_add_del_tunnel(
213                 local_address=cls.pg0.local_ip4,
214                 remote_address=cls.mcast_ip4,
215                 mcast_sw_if_index=1,
216                 vni=cls.mcast_flood_bd,
217             )
218             cls.vapi.sw_interface_set_l2_bridge(
219                 rx_sw_if_index=r.sw_if_index, bd_id=cls.mcast_flood_bd
220             )
221             cls.vapi.sw_interface_set_l2_bridge(
222                 rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd
223             )
224
225             # Add and delete mcast tunnels to check stability
226             cls.add_shared_mcast_dst_load()
227             cls.add_mcast_tunnels_load()
228             cls.del_shared_mcast_dst_load()
229             cls.del_mcast_tunnels_load()
230
231             # Setup vni 3 to test unicast flooding
232             cls.ucast_flood_bd = 3
233             cls.create_geneve_flood_test_bd(cls.ucast_flood_bd, cls.n_ucast_tunnels)
234             cls.vapi.sw_interface_set_l2_bridge(
235                 rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd
236             )
237         except Exception:
238             super(TestGeneve, cls).tearDownClass()
239             raise
240
241     # Method to define VPP actions before tear down of the test case.
242     #  Overrides tearDown method in VppTestCase class.
243     #  @param self The object pointer.
244     def tearDown(self):
245         super(TestGeneve, self).tearDown()
246
247     def show_commands_at_teardown(self):
248         self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
249         self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
250         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
251         self.logger.info(self.vapi.cli("show geneve tunnel"))
252
253
254 class TestGeneveL3(VppTestCase):
255     """GENEVE L3 Test Case"""
256
257     @classmethod
258     def setUpClass(cls):
259         super(TestGeneveL3, cls).setUpClass()
260         try:
261             cls.create_pg_interfaces(range(2))
262             cls.interfaces = list(cls.pg_interfaces)
263
264             for i in cls.interfaces:
265                 i.admin_up()
266                 i.config_ip4()
267                 i.resolve_arp()
268         except Exception:
269             super(TestGeneveL3, cls).tearDownClass()
270             raise
271
272     @classmethod
273     def tearDownClass(cls):
274         super(TestGeneveL3, cls).tearDownClass()
275
276     def tearDown(self):
277         super(TestGeneveL3, self).tearDown()
278
279     def show_commands_at_teardown(self):
280         self.logger.info(self.vapi.cli("show geneve tunnel"))
281         self.logger.info(self.vapi.cli("show ip neighbor"))
282
283     def test_l3_packet(self):
284         vni = 1234
285         r = self.vapi.add_node_next(
286             node_name="geneve4-input", next_name="ethernet-input"
287         )
288         r = self.vapi.geneve_add_del_tunnel2(
289             is_add=1,
290             local_address=self.pg0.local_ip4,
291             remote_address=self.pg0.remote_ip4,
292             vni=vni,
293             l3_mode=1,
294             decap_next_index=r.next_index,
295         )
296
297         self.vapi.sw_interface_add_del_address(
298             sw_if_index=r.sw_if_index, prefix="10.0.0.1/24"
299         )
300
301         pkt = (
302             Ether(src=self.pg0.remote_mac, dst="d0:0b:ee:d0:00:00")
303             / IP(src="10.0.0.2", dst="10.0.0.1")
304             / ICMP()
305         )
306
307         encap = (
308             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
309             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
310             / UDP(sport=6081, dport=6081, chksum=0)
311             / GENEVE(vni=vni)
312         )
313
314         arp = Ether(src=self.pg0.remote_mac, dst="d0:0b:ee:d0:00:00") / ARP(
315             op="is-at",
316             hwsrc=self.pg0.remote_mac,
317             hwdst="d0:0b:ee:d0:00:00",
318             psrc="10.0.0.2",
319             pdst="10.0.0.1",
320         )
321
322         rx = self.send_and_expect(self.pg0, encap / pkt * 1, self.pg0)
323         rx = self.send_and_assert_no_replies(self.pg0, encap / arp * 1, self.pg0)
324         rx = self.send_and_expect(self.pg0, encap / pkt * 1, self.pg0)
325         self.assertEqual(rx[0][ICMP].type, 0)  # echo reply
326
327         r = self.vapi.geneve_add_del_tunnel2(
328             is_add=0,
329             local_address=self.pg0.local_ip4,
330             remote_address=self.pg0.remote_ip4,
331             vni=vni,
332         )
333
334
335 if __name__ == "__main__":
336     unittest.main(testRunner=VppTestRunner)