VXLAN-GBP: use common types on the API
[vpp.git] / test / test_vxlan_gbp.py
1 #!/usr/bin/env python
2
3 import socket
4 from util import ip4_range
5 import unittest
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
8 from vpp_ip import VppIpAddress
9
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.utils import atol
14
15 import StringIO
16
17
18 def reassemble(listoffragments):
19     buffer = StringIO.StringIO()
20     first = listoffragments[0]
21     buffer.seek(20)
22     for pkt in listoffragments:
23         buffer.seek(pkt[IP].frag*8)
24         buffer.write(pkt[IP].payload)
25     first.len = len(buffer.getvalue()) + 20
26     first.flags = 0
27     del(first.chksum)
28     header = str(first[Ether])[:34]
29     return first[Ether].__class__(header + buffer.getvalue())
30
31
32 class TestVxlanGbp(VppTestCase):
33     """ VXLAN GBP Test Case """
34
35     @property
36     def frame_request(self):
37         """ Ethernet frame modeling a generic request """
38         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
39                 IP(src='1.2.3.4', dst='4.3.2.1') /
40                 UDP(sport=10000, dport=20000) /
41                 Raw('\xa5' * 100))
42
43     @property
44     def frame_reply(self):
45         """ Ethernet frame modeling a generic reply """
46         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
47                 IP(src='4.3.2.1', dst='1.2.3.4') /
48                 UDP(sport=20000, dport=10000) /
49                 Raw('\xa5' * 100))
50
51     def encapsulate(self, pkt, vni):
52         """
53         Encapsulate the original payload frame by adding VXLAN GBP header with
54         its UDP, IP and Ethernet fields
55         """
56         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
57                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
58                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
59                 VXLAN(vni=vni, flags=self.flags, gpflags=self.gpflags,
60                 gpid=self.sclass) / pkt)
61
62     def ip_range(self, start, end):
63         """ range of remote ip's """
64         return ip4_range(self.pg0.remote_ip4, start, end)
65
66     def decapsulate(self, pkt):
67         """
68         Decapsulate the original payload frame by removing VXLAN header
69         """
70         # check if is set G and I flag
71         self.assertEqual(pkt[VXLAN].flags, int('0x88', 16))
72         return pkt[VXLAN].payload
73
74     # Method for checking VXLAN GBP encapsulation.
75     #
76     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
77         # TODO: add error messages
78         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
79         #  by VPP using ARP.
80         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
81         if not local_only:
82             if not mcast_pkt:
83                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
84             else:
85                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
86         # Verify VXLAN GBP tunnel source IP is VPP_IP and destination IP is
87         # MY_IP.
88         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
89         if not local_only:
90             if not mcast_pkt:
91                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
92             else:
93                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
94         # Verify UDP destination port is VXLAN GBP 48879, source UDP port could
95         # be arbitrary.
96         self.assertEqual(pkt[UDP].dport, type(self).dport)
97         # TODO: checksum check
98         # Verify VNI
99         # pkt.show()
100         self.assertEqual(pkt[VXLAN].vni, vni)
101         # Verify Source Class
102         self.assertEqual(pkt[VXLAN].gpid, 0)
103
104     @classmethod
105     def create_vxlan_gbp_flood_test_bd(cls, vni, n_ucast_tunnels):
106         # Create 2 ucast vxlan tunnels under bd
107         ip_range_start = 10
108         ip_range_end = ip_range_start + n_ucast_tunnels
109         next_hop_address = cls.pg0.remote_ip4n
110         for dest_ip4 in ip4_range(cls.pg0.remote_ip4,
111                                   ip_range_start,
112                                   ip_range_end):
113             # add host route so dest_ip4n will not be resolved
114             vip = VppIpAddress(dest_ip4)
115             cls.vapi.ip_add_del_route(vip.bytes, 32, next_hop_address)
116             r = cls.vapi.vxlan_gbp_tunnel_add_del(
117                 VppIpAddress(cls.pg0.local_ip4).encode(),
118                 vip.encode(),
119                 vni=vni)
120             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
121
122     # Class method to start the VXLAN GBP test case.
123     #  Overrides setUpClass method in VppTestCase class.
124     #  Python try..except statement is used to ensure that the tear down of
125     #  the class will be executed even if exception is raised.
126     #  @param cls The class pointer.
127     @classmethod
128     def setUpClass(cls):
129         super(TestVxlanGbp, cls).setUpClass()
130
131         try:
132             cls.dport = 48879
133             cls.flags = 0x88
134             cls.gpflags = 0x0
135             cls.sclass = 0
136
137             # Create 2 pg interfaces.
138             cls.create_pg_interfaces(range(4))
139             for pg in cls.pg_interfaces:
140                 pg.admin_up()
141
142             # Configure IPv4 addresses on VPP pg0.
143             cls.pg0.config_ip4()
144
145             # Resolve MAC address for VPP's IP address on pg0.
146             cls.pg0.resolve_arp()
147
148             # Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
149             # pg1 into BD.
150             cls.single_tunnel_bd = 1
151             r = cls.vapi.vxlan_gbp_tunnel_add_del(
152                 VppIpAddress(cls.pg0.local_ip4).encode(),
153                 VppIpAddress(cls.pg0.remote_ip4).encode(),
154                 vni=cls.single_tunnel_bd)
155             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
156                                                 bd_id=cls.single_tunnel_bd)
157             cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
158                                                 bd_id=cls.single_tunnel_bd)
159
160             # Setup vni 2 to test multicast flooding
161             cls.n_ucast_tunnels = 2
162             # Setup vni 3 to test unicast flooding
163             cls.ucast_flood_bd = 3
164             cls.create_vxlan_gbp_flood_test_bd(cls.ucast_flood_bd,
165                                                cls.n_ucast_tunnels)
166             cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
167                                                 bd_id=cls.ucast_flood_bd)
168         except Exception:
169             super(TestVxlanGbp, cls).tearDownClass()
170             raise
171
172     def assert_eq_pkts(self, pkt1, pkt2):
173         """ Verify the Ether, IP, UDP, payload are equal in both
174         packets
175         """
176         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
177         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
178         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
179         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
180         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
181         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
182         self.assertEqual(pkt1[Raw], pkt2[Raw])
183
184     def test_decap(self):
185         """ Decapsulation test
186         Send encapsulated frames from pg0
187         Verify receipt of decapsulated frames on pg1
188         """
189         encapsulated_pkt = self.encapsulate(self.frame_request,
190                                             self.single_tunnel_bd)
191
192         self.pg0.add_stream([encapsulated_pkt, ])
193
194         self.pg1.enable_capture()
195
196         self.pg_start()
197
198         # Pick first received frame and check if it's the non-encapsulated
199         # frame
200         out = self.pg1.get_capture(1)
201         pkt = out[0]
202         self.assert_eq_pkts(pkt, self.frame_request)
203
204     def test_encap(self):
205         """ Encapsulation test
206         Send frames from pg1
207         Verify receipt of encapsulated frames on pg0
208         """
209         self.pg1.add_stream([self.frame_reply])
210
211         self.pg0.enable_capture()
212
213         self.pg_start()
214
215         # Pick first received frame and check if it's corectly encapsulated.
216         out = self.pg0.get_capture(1)
217         pkt = out[0]
218         self.check_encapsulation(pkt, self.single_tunnel_bd)
219
220         payload = self.decapsulate(pkt)
221         self.assert_eq_pkts(payload, self.frame_reply)
222
223     def test_ucast_flood(self):
224         """ Unicast flood test
225         Send frames from pg3
226         Verify receipt of encapsulated frames on pg0
227         """
228         self.pg3.add_stream([self.frame_reply])
229
230         self.pg0.enable_capture()
231
232         self.pg_start()
233
234         # Get packet from each tunnel and assert it's corectly encapsulated.
235         out = self.pg0.get_capture(self.n_ucast_tunnels)
236         for pkt in out:
237             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
238             payload = self.decapsulate(pkt)
239             self.assert_eq_pkts(payload, self.frame_reply)
240
241     def test_encap_big_packet(self):
242         """ Encapsulation test send big frame from pg1
243         Verify receipt of encapsulated frames on pg0
244         """
245
246         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
247
248         frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
249                  IP(src='4.3.2.1', dst='1.2.3.4') /
250                  UDP(sport=20000, dport=10000) /
251                  Raw('\xa5' * 1450))
252
253         self.pg1.add_stream([frame])
254
255         self.pg0.enable_capture()
256
257         self.pg_start()
258
259         # Pick first received frame and check if it's correctly encapsulated.
260         out = self.pg0.get_capture(2)
261         pkt = reassemble(out)
262         self.check_encapsulation(pkt, self.single_tunnel_bd)
263
264         payload = self.decapsulate(pkt)
265         self.assert_eq_pkts(payload, frame)
266
267 # Method to define VPP actions before tear down of the test case.
268 #  Overrides tearDown method in VppTestCase class.
269 #  @param self The object pointer.
270     def tearDown(self):
271         super(TestVxlanGbp, self).tearDown()
272         if not self.vpp_dead:
273             self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
274             self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
275             self.logger.info(self.vapi.cli("show vxlan-gbp tunnel"))
276             self.logger.info(self.vapi.cli("show error"))
277
278
279 if __name__ == '__main__':
280     unittest.main(testRunner=VppTestRunner)