Typos. A bunch of typos I've been collecting.
[vpp.git] / test / test_vxlan_gbp.py
1 #!/usr/bin/env python
2
3 import socket
4 from util import ip4_range, reassemble4_ether
5 import unittest
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
8 from vpp_ip import VppIpAddress
9
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.utils import atol
14
15
16 class TestVxlanGbp(VppTestCase):
17     """ VXLAN GBP Test Case """
18
19     @property
20     def frame_request(self):
21         """ Ethernet frame modeling a generic request """
22         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
23                 IP(src='1.2.3.4', dst='4.3.2.1') /
24                 UDP(sport=10000, dport=20000) /
25                 Raw('\xa5' * 100))
26
27     @property
28     def frame_reply(self):
29         """ Ethernet frame modeling a generic reply """
30         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
31                 IP(src='4.3.2.1', dst='1.2.3.4') /
32                 UDP(sport=20000, dport=10000) /
33                 Raw('\xa5' * 100))
34
35     def encapsulate(self, pkt, vni):
36         """
37         Encapsulate the original payload frame by adding VXLAN GBP header with
38         its UDP, IP and Ethernet fields
39         """
40         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
41                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
42                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
43                 VXLAN(vni=vni, flags=self.flags, gpflags=self.gpflags,
44                 gpid=self.sclass) / pkt)
45
46     def ip_range(self, start, end):
47         """ range of remote ip's """
48         return ip4_range(self.pg0.remote_ip4, start, end)
49
50     def decapsulate(self, pkt):
51         """
52         Decapsulate the original payload frame by removing VXLAN header
53         """
54         # check if is set G and I flag
55         self.assertEqual(pkt[VXLAN].flags, int('0x88', 16))
56         return pkt[VXLAN].payload
57
58     # Method for checking VXLAN GBP encapsulation.
59     #
60     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
61         # TODO: add error messages
62         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
63         #  by VPP using ARP.
64         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
65         if not local_only:
66             if not mcast_pkt:
67                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
68             else:
69                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
70         # Verify VXLAN GBP tunnel source IP is VPP_IP and destination IP is
71         # MY_IP.
72         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
73         if not local_only:
74             if not mcast_pkt:
75                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
76             else:
77                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
78         # Verify UDP destination port is VXLAN GBP 48879, source UDP port could
79         # be arbitrary.
80         self.assertEqual(pkt[UDP].dport, type(self).dport)
81         # TODO: checksum check
82         # Verify VNI
83         # pkt.show()
84         self.assertEqual(pkt[VXLAN].vni, vni)
85         # Verify Source Class
86         self.assertEqual(pkt[VXLAN].gpid, 0)
87
88     @classmethod
89     def create_vxlan_gbp_flood_test_bd(cls, vni, n_ucast_tunnels):
90         # Create 2 ucast vxlan tunnels under bd
91         ip_range_start = 10
92         ip_range_end = ip_range_start + n_ucast_tunnels
93         next_hop_address = cls.pg0.remote_ip4n
94         for dest_ip4 in ip4_range(cls.pg0.remote_ip4,
95                                   ip_range_start,
96                                   ip_range_end):
97             # add host route so dest_ip4n will not be resolved
98             vip = VppIpAddress(dest_ip4)
99             cls.vapi.ip_add_del_route(dst_address=vip.bytes,
100                                       dst_address_length=32,
101                                       next_hop_address=next_hop_address)
102             r = cls.vapi.vxlan_gbp_tunnel_add_del(
103                 VppIpAddress(cls.pg0.local_ip4).encode(),
104                 vip.encode(),
105                 vni=vni)
106             cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
107                                                 bd_id=vni)
108
109     # Class method to start the VXLAN GBP test case.
110     #  Overrides setUpClass method in VppTestCase class.
111     #  Python try..except statement is used to ensure that the tear down of
112     #  the class will be executed even if exception is raised.
113     #  @param cls The class pointer.
114     @classmethod
115     def setUpClass(cls):
116         super(TestVxlanGbp, cls).setUpClass()
117
118         try:
119             cls.dport = 48879
120             cls.flags = 0x88
121             cls.gpflags = 0x0
122             cls.sclass = 0
123
124             # Create 2 pg interfaces.
125             cls.create_pg_interfaces(range(4))
126             for pg in cls.pg_interfaces:
127                 pg.admin_up()
128
129             # Configure IPv4 addresses on VPP pg0.
130             cls.pg0.config_ip4()
131
132             # Resolve MAC address for VPP's IP address on pg0.
133             cls.pg0.resolve_arp()
134
135             # Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
136             # pg1 into BD.
137             cls.single_tunnel_bd = 1
138             r = cls.vapi.vxlan_gbp_tunnel_add_del(
139                 VppIpAddress(cls.pg0.local_ip4).encode(),
140                 VppIpAddress(cls.pg0.remote_ip4).encode(),
141                 vni=cls.single_tunnel_bd)
142             cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
143                                                 bd_id=cls.single_tunnel_bd)
144             cls.vapi.sw_interface_set_l2_bridge(
145                 rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd)
146
147             # Setup vni 2 to test multicast flooding
148             cls.n_ucast_tunnels = 2
149             # Setup vni 3 to test unicast flooding
150             cls.ucast_flood_bd = 3
151             cls.create_vxlan_gbp_flood_test_bd(cls.ucast_flood_bd,
152                                                cls.n_ucast_tunnels)
153             cls.vapi.sw_interface_set_l2_bridge(
154                 rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd)
155         except Exception:
156             super(TestVxlanGbp, cls).tearDownClass()
157             raise
158
159     def assert_eq_pkts(self, pkt1, pkt2):
160         """ Verify the Ether, IP, UDP, payload are equal in both
161         packets
162         """
163         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
164         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
165         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
166         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
167         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
168         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
169         self.assertEqual(pkt1[Raw], pkt2[Raw])
170
171     def test_decap(self):
172         """ Decapsulation test
173         Send encapsulated frames from pg0
174         Verify receipt of decapsulated frames on pg1
175         """
176         encapsulated_pkt = self.encapsulate(self.frame_request,
177                                             self.single_tunnel_bd)
178
179         self.pg0.add_stream([encapsulated_pkt, ])
180
181         self.pg1.enable_capture()
182
183         self.pg_start()
184
185         # Pick first received frame and check if it's the non-encapsulated
186         # frame
187         out = self.pg1.get_capture(1)
188         pkt = out[0]
189         self.assert_eq_pkts(pkt, self.frame_request)
190
191     def test_encap(self):
192         """ Encapsulation test
193         Send frames from pg1
194         Verify receipt of encapsulated frames on pg0
195         """
196         self.pg1.add_stream([self.frame_reply])
197
198         self.pg0.enable_capture()
199
200         self.pg_start()
201
202         # Pick first received frame and check if it's correctly encapsulated.
203         out = self.pg0.get_capture(1)
204         pkt = out[0]
205         self.check_encapsulation(pkt, self.single_tunnel_bd)
206
207         payload = self.decapsulate(pkt)
208         self.assert_eq_pkts(payload, self.frame_reply)
209
210     def test_ucast_flood(self):
211         """ Unicast flood test
212         Send frames from pg3
213         Verify receipt of encapsulated frames on pg0
214         """
215         self.pg3.add_stream([self.frame_reply])
216
217         self.pg0.enable_capture()
218
219         self.pg_start()
220
221         # Get packet from each tunnel and assert it's correctly encapsulated.
222         out = self.pg0.get_capture(self.n_ucast_tunnels)
223         for pkt in out:
224             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
225             payload = self.decapsulate(pkt)
226             self.assert_eq_pkts(payload, self.frame_reply)
227
228     def test_encap_big_packet(self):
229         """ Encapsulation test send big frame from pg1
230         Verify receipt of encapsulated frames on pg0
231         """
232
233         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
234
235         frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
236                  IP(src='4.3.2.1', dst='1.2.3.4') /
237                  UDP(sport=20000, dport=10000) /
238                  Raw('\xa5' * 1450))
239
240         self.pg1.add_stream([frame])
241
242         self.pg0.enable_capture()
243
244         self.pg_start()
245
246         # Pick first received frame and check if it's correctly encapsulated.
247         out = self.pg0.get_capture(2)
248         pkt = reassemble4_ether(out)
249         self.check_encapsulation(pkt, self.single_tunnel_bd)
250
251         payload = self.decapsulate(pkt)
252         self.assert_eq_pkts(payload, frame)
253
254 # Method to define VPP actions before tear down of the test case.
255 #  Overrides tearDown method in VppTestCase class.
256 #  @param self The object pointer.
257     def tearDown(self):
258         super(TestVxlanGbp, self).tearDown()
259         if not self.vpp_dead:
260             self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
261             self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
262             self.logger.info(self.vapi.cli("show vxlan-gbp tunnel"))
263             self.logger.info(self.vapi.cli("show error"))
264
265
266 if __name__ == '__main__':
267     unittest.main(testRunner=VppTestRunner)