vxlan-gbp: Add support for vxlan gbp
[vpp.git] / test / test_vxlan_gbp.py
1 #!/usr/bin/env python
2
3 import socket
4 from util import ip4n_range, ip4_range
5 import unittest
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
8
9 from scapy.layers.l2 import Ether, Raw
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.vxlan import VXLAN
12 from scapy.utils import atol
13
14 import StringIO
15
16
17 def reassemble(listoffragments):
18     buffer = StringIO.StringIO()
19     first = listoffragments[0]
20     buffer.seek(20)
21     for pkt in listoffragments:
22         buffer.seek(pkt[IP].frag*8)
23         buffer.write(pkt[IP].payload)
24     first.len = len(buffer.getvalue()) + 20
25     first.flags = 0
26     del(first.chksum)
27     header = str(first[Ether])[:34]
28     return first[Ether].__class__(header + buffer.getvalue())
29
30
31 class TestVxlanGbp(VppTestCase):
32     """ VXLAN GBP Test Case """
33
34     @property
35     def frame_request(self):
36         """ Ethernet frame modeling a generic request """
37         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
38                 IP(src='1.2.3.4', dst='4.3.2.1') /
39                 UDP(sport=10000, dport=20000) /
40                 Raw('\xa5' * 100))
41
42     @property
43     def frame_reply(self):
44         """ Ethernet frame modeling a generic reply """
45         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
46                 IP(src='4.3.2.1', dst='1.2.3.4') /
47                 UDP(sport=20000, dport=10000) /
48                 Raw('\xa5' * 100))
49
50     def encapsulate(self, pkt, vni):
51         """
52         Encapsulate the original payload frame by adding VXLAN GBP header with
53         its UDP, IP and Ethernet fields
54         """
55         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
56                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
57                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
58                 VXLAN(vni=vni, flags=self.flags, gpflags=self.gpflags,
59                 gpid=self.sclass) / pkt)
60
61     def ip_range(self, start, end):
62         """ range of remote ip's """
63         return ip4_range(self.pg0.remote_ip4, start, end)
64
65     def decapsulate(self, pkt):
66         """
67         Decapsulate the original payload frame by removing VXLAN header
68         """
69         # check if is set G and I flag
70         self.assertEqual(pkt[VXLAN].flags, int('0x88', 16))
71         return pkt[VXLAN].payload
72
73     # Method for checking VXLAN GBP encapsulation.
74     #
75     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
76         # TODO: add error messages
77         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
78         #  by VPP using ARP.
79         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
80         if not local_only:
81             if not mcast_pkt:
82                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
83             else:
84                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
85         # Verify VXLAN GBP tunnel source IP is VPP_IP and destination IP is
86         # MY_IP.
87         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
88         if not local_only:
89             if not mcast_pkt:
90                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
91             else:
92                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
93         # Verify UDP destination port is VXLAN GBP 48879, source UDP port could
94         # be arbitrary.
95         self.assertEqual(pkt[UDP].dport, type(self).dport)
96         # TODO: checksum check
97         # Verify VNI
98         # pkt.show()
99         self.assertEqual(pkt[VXLAN].vni, vni)
100         # Verify Source Class
101         self.assertEqual(pkt[VXLAN].gpid, 0)
102
103     @classmethod
104     def create_vxlan_gbp_flood_test_bd(cls, vni, n_ucast_tunnels):
105         # Create 2 ucast vxlan tunnels under bd
106         ip_range_start = 10
107         ip_range_end = ip_range_start + n_ucast_tunnels
108         next_hop_address = cls.pg0.remote_ip4n
109         for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
110                                     ip_range_end):
111             # add host route so dest_ip4n will not be resolved
112             cls.vapi.ip_add_del_route(dest_ip4n, 32, next_hop_address)
113             r = cls.vapi.vxlan_gbp_add_del_tunnel(
114                 src_addr=cls.pg0.local_ip4n,
115                 dst_addr=dest_ip4n,
116                 vni=vni)
117             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
118
119     # Class method to start the VXLAN GBP test case.
120     #  Overrides setUpClass method in VppTestCase class.
121     #  Python try..except statement is used to ensure that the tear down of
122     #  the class will be executed even if exception is raised.
123     #  @param cls The class pointer.
124     @classmethod
125     def setUpClass(cls):
126         super(TestVxlanGbp, cls).setUpClass()
127
128         try:
129             cls.dport = 48879
130             cls.flags = 0x88
131             cls.gpflags = 0x0
132             cls.sclass = 0
133
134             # Create 2 pg interfaces.
135             cls.create_pg_interfaces(range(4))
136             for pg in cls.pg_interfaces:
137                 pg.admin_up()
138
139             # Configure IPv4 addresses on VPP pg0.
140             cls.pg0.config_ip4()
141
142             # Resolve MAC address for VPP's IP address on pg0.
143             cls.pg0.resolve_arp()
144
145             # Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
146             # pg1 into BD.
147             cls.single_tunnel_bd = 1
148             r = cls.vapi.vxlan_gbp_add_del_tunnel(
149                 src_addr=cls.pg0.local_ip4n,
150                 dst_addr=cls.pg0.remote_ip4n,
151                 vni=cls.single_tunnel_bd)
152             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
153                                                 bd_id=cls.single_tunnel_bd)
154             cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
155                                                 bd_id=cls.single_tunnel_bd)
156
157             # Setup vni 2 to test multicast flooding
158             cls.n_ucast_tunnels = 2
159             # Setup vni 3 to test unicast flooding
160             cls.ucast_flood_bd = 3
161             cls.create_vxlan_gbp_flood_test_bd(cls.ucast_flood_bd,
162                                                cls.n_ucast_tunnels)
163             cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
164                                                 bd_id=cls.ucast_flood_bd)
165         except Exception:
166             super(TestVxlanGbp, cls).tearDownClass()
167             raise
168
169     def assert_eq_pkts(self, pkt1, pkt2):
170         """ Verify the Ether, IP, UDP, payload are equal in both
171         packets
172         """
173         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
174         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
175         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
176         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
177         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
178         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
179         self.assertEqual(pkt1[Raw], pkt2[Raw])
180
181     def test_decap(self):
182         """ Decapsulation test
183         Send encapsulated frames from pg0
184         Verify receipt of decapsulated frames on pg1
185         """
186         encapsulated_pkt = self.encapsulate(self.frame_request,
187                                             self.single_tunnel_bd)
188
189         self.pg0.add_stream([encapsulated_pkt, ])
190
191         self.pg1.enable_capture()
192
193         self.pg_start()
194
195         # Pick first received frame and check if it's the non-encapsulated
196         # frame
197         out = self.pg1.get_capture(1)
198         pkt = out[0]
199         self.assert_eq_pkts(pkt, self.frame_request)
200
201     def test_encap(self):
202         """ Encapsulation test
203         Send frames from pg1
204         Verify receipt of encapsulated frames on pg0
205         """
206         self.pg1.add_stream([self.frame_reply])
207
208         self.pg0.enable_capture()
209
210         self.pg_start()
211
212         # Pick first received frame and check if it's corectly encapsulated.
213         out = self.pg0.get_capture(1)
214         pkt = out[0]
215         self.check_encapsulation(pkt, self.single_tunnel_bd)
216
217         payload = self.decapsulate(pkt)
218         self.assert_eq_pkts(payload, self.frame_reply)
219
220     def test_ucast_flood(self):
221         """ Unicast flood test
222         Send frames from pg3
223         Verify receipt of encapsulated frames on pg0
224         """
225         self.pg3.add_stream([self.frame_reply])
226
227         self.pg0.enable_capture()
228
229         self.pg_start()
230
231         # Get packet from each tunnel and assert it's corectly encapsulated.
232         out = self.pg0.get_capture(self.n_ucast_tunnels)
233         for pkt in out:
234             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
235             payload = self.decapsulate(pkt)
236             self.assert_eq_pkts(payload, self.frame_reply)
237
238     def test_encap_big_packet(self):
239         """ Encapsulation test send big frame from pg1
240         Verify receipt of encapsulated frames on pg0
241         """
242
243         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
244
245         frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
246                  IP(src='4.3.2.1', dst='1.2.3.4') /
247                  UDP(sport=20000, dport=10000) /
248                  Raw('\xa5' * 1450))
249
250         self.pg1.add_stream([frame])
251
252         self.pg0.enable_capture()
253
254         self.pg_start()
255
256         # Pick first received frame and check if it's correctly encapsulated.
257         out = self.pg0.get_capture(2)
258         pkt = reassemble(out)
259         self.check_encapsulation(pkt, self.single_tunnel_bd)
260
261         payload = self.decapsulate(pkt)
262         self.assert_eq_pkts(payload, frame)
263
264 # Method to define VPP actions before tear down of the test case.
265 #  Overrides tearDown method in VppTestCase class.
266 #  @param self The object pointer.
267     def tearDown(self):
268         super(TestVxlanGbp, self).tearDown()
269         if not self.vpp_dead:
270             self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
271             self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
272             self.logger.info(self.vapi.cli("show vxlan-gbp tunnel"))
273             self.logger.info(self.vapi.cli("show error"))
274
275
276 if __name__ == '__main__':
277     unittest.main(testRunner=VppTestRunner)