MAP: Convert from DPO to input feature.
[vpp.git] / test / test_vxlan_gbp.py
1 #!/usr/bin/env python
2
3 import socket
4 from util import ip4_range, reassemble4_ether
5 import unittest
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
8 from vpp_ip import VppIpAddress
9
10 from scapy.layers.l2 import Ether, Raw
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.vxlan import VXLAN
13 from scapy.utils import atol
14
15
16 class TestVxlanGbp(VppTestCase):
17     """ VXLAN GBP Test Case """
18
19     @property
20     def frame_request(self):
21         """ Ethernet frame modeling a generic request """
22         return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
23                 IP(src='1.2.3.4', dst='4.3.2.1') /
24                 UDP(sport=10000, dport=20000) /
25                 Raw('\xa5' * 100))
26
27     @property
28     def frame_reply(self):
29         """ Ethernet frame modeling a generic reply """
30         return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
31                 IP(src='4.3.2.1', dst='1.2.3.4') /
32                 UDP(sport=20000, dport=10000) /
33                 Raw('\xa5' * 100))
34
35     def encapsulate(self, pkt, vni):
36         """
37         Encapsulate the original payload frame by adding VXLAN GBP header with
38         its UDP, IP and Ethernet fields
39         """
40         return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
41                 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
42                 UDP(sport=self.dport, dport=self.dport, chksum=0) /
43                 VXLAN(vni=vni, flags=self.flags, gpflags=self.gpflags,
44                 gpid=self.sclass) / pkt)
45
46     def ip_range(self, start, end):
47         """ range of remote ip's """
48         return ip4_range(self.pg0.remote_ip4, start, end)
49
50     def decapsulate(self, pkt):
51         """
52         Decapsulate the original payload frame by removing VXLAN header
53         """
54         # check if is set G and I flag
55         self.assertEqual(pkt[VXLAN].flags, int('0x88', 16))
56         return pkt[VXLAN].payload
57
58     # Method for checking VXLAN GBP encapsulation.
59     #
60     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
61         # TODO: add error messages
62         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
63         #  by VPP using ARP.
64         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
65         if not local_only:
66             if not mcast_pkt:
67                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
68             else:
69                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
70         # Verify VXLAN GBP tunnel source IP is VPP_IP and destination IP is
71         # MY_IP.
72         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
73         if not local_only:
74             if not mcast_pkt:
75                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
76             else:
77                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
78         # Verify UDP destination port is VXLAN GBP 48879, source UDP port could
79         # be arbitrary.
80         self.assertEqual(pkt[UDP].dport, type(self).dport)
81         # TODO: checksum check
82         # Verify VNI
83         # pkt.show()
84         self.assertEqual(pkt[VXLAN].vni, vni)
85         # Verify Source Class
86         self.assertEqual(pkt[VXLAN].gpid, 0)
87
88     @classmethod
89     def create_vxlan_gbp_flood_test_bd(cls, vni, n_ucast_tunnels):
90         # Create 2 ucast vxlan tunnels under bd
91         ip_range_start = 10
92         ip_range_end = ip_range_start + n_ucast_tunnels
93         next_hop_address = cls.pg0.remote_ip4n
94         for dest_ip4 in ip4_range(cls.pg0.remote_ip4,
95                                   ip_range_start,
96                                   ip_range_end):
97             # add host route so dest_ip4n will not be resolved
98             vip = VppIpAddress(dest_ip4)
99             cls.vapi.ip_add_del_route(vip.bytes, 32, next_hop_address)
100             r = cls.vapi.vxlan_gbp_tunnel_add_del(
101                 VppIpAddress(cls.pg0.local_ip4).encode(),
102                 vip.encode(),
103                 vni=vni)
104             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
105
106     # Class method to start the VXLAN GBP test case.
107     #  Overrides setUpClass method in VppTestCase class.
108     #  Python try..except statement is used to ensure that the tear down of
109     #  the class will be executed even if exception is raised.
110     #  @param cls The class pointer.
111     @classmethod
112     def setUpClass(cls):
113         super(TestVxlanGbp, cls).setUpClass()
114
115         try:
116             cls.dport = 48879
117             cls.flags = 0x88
118             cls.gpflags = 0x0
119             cls.sclass = 0
120
121             # Create 2 pg interfaces.
122             cls.create_pg_interfaces(range(4))
123             for pg in cls.pg_interfaces:
124                 pg.admin_up()
125
126             # Configure IPv4 addresses on VPP pg0.
127             cls.pg0.config_ip4()
128
129             # Resolve MAC address for VPP's IP address on pg0.
130             cls.pg0.resolve_arp()
131
132             # Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
133             # pg1 into BD.
134             cls.single_tunnel_bd = 1
135             r = cls.vapi.vxlan_gbp_tunnel_add_del(
136                 VppIpAddress(cls.pg0.local_ip4).encode(),
137                 VppIpAddress(cls.pg0.remote_ip4).encode(),
138                 vni=cls.single_tunnel_bd)
139             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
140                                                 bd_id=cls.single_tunnel_bd)
141             cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
142                                                 bd_id=cls.single_tunnel_bd)
143
144             # Setup vni 2 to test multicast flooding
145             cls.n_ucast_tunnels = 2
146             # Setup vni 3 to test unicast flooding
147             cls.ucast_flood_bd = 3
148             cls.create_vxlan_gbp_flood_test_bd(cls.ucast_flood_bd,
149                                                cls.n_ucast_tunnels)
150             cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
151                                                 bd_id=cls.ucast_flood_bd)
152         except Exception:
153             super(TestVxlanGbp, cls).tearDownClass()
154             raise
155
156     def assert_eq_pkts(self, pkt1, pkt2):
157         """ Verify the Ether, IP, UDP, payload are equal in both
158         packets
159         """
160         self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
161         self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
162         self.assertEqual(pkt1[IP].src, pkt2[IP].src)
163         self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
164         self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
165         self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
166         self.assertEqual(pkt1[Raw], pkt2[Raw])
167
168     def test_decap(self):
169         """ Decapsulation test
170         Send encapsulated frames from pg0
171         Verify receipt of decapsulated frames on pg1
172         """
173         encapsulated_pkt = self.encapsulate(self.frame_request,
174                                             self.single_tunnel_bd)
175
176         self.pg0.add_stream([encapsulated_pkt, ])
177
178         self.pg1.enable_capture()
179
180         self.pg_start()
181
182         # Pick first received frame and check if it's the non-encapsulated
183         # frame
184         out = self.pg1.get_capture(1)
185         pkt = out[0]
186         self.assert_eq_pkts(pkt, self.frame_request)
187
188     def test_encap(self):
189         """ Encapsulation test
190         Send frames from pg1
191         Verify receipt of encapsulated frames on pg0
192         """
193         self.pg1.add_stream([self.frame_reply])
194
195         self.pg0.enable_capture()
196
197         self.pg_start()
198
199         # Pick first received frame and check if it's corectly encapsulated.
200         out = self.pg0.get_capture(1)
201         pkt = out[0]
202         self.check_encapsulation(pkt, self.single_tunnel_bd)
203
204         payload = self.decapsulate(pkt)
205         self.assert_eq_pkts(payload, self.frame_reply)
206
207     def test_ucast_flood(self):
208         """ Unicast flood test
209         Send frames from pg3
210         Verify receipt of encapsulated frames on pg0
211         """
212         self.pg3.add_stream([self.frame_reply])
213
214         self.pg0.enable_capture()
215
216         self.pg_start()
217
218         # Get packet from each tunnel and assert it's corectly encapsulated.
219         out = self.pg0.get_capture(self.n_ucast_tunnels)
220         for pkt in out:
221             self.check_encapsulation(pkt, self.ucast_flood_bd, True)
222             payload = self.decapsulate(pkt)
223             self.assert_eq_pkts(payload, self.frame_reply)
224
225     def test_encap_big_packet(self):
226         """ Encapsulation test send big frame from pg1
227         Verify receipt of encapsulated frames on pg0
228         """
229
230         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
231
232         frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
233                  IP(src='4.3.2.1', dst='1.2.3.4') /
234                  UDP(sport=20000, dport=10000) /
235                  Raw('\xa5' * 1450))
236
237         self.pg1.add_stream([frame])
238
239         self.pg0.enable_capture()
240
241         self.pg_start()
242
243         # Pick first received frame and check if it's correctly encapsulated.
244         out = self.pg0.get_capture(2)
245         pkt = reassemble4_ether(out)
246         self.check_encapsulation(pkt, self.single_tunnel_bd)
247
248         payload = self.decapsulate(pkt)
249         self.assert_eq_pkts(payload, frame)
250
251 # Method to define VPP actions before tear down of the test case.
252 #  Overrides tearDown method in VppTestCase class.
253 #  @param self The object pointer.
254     def tearDown(self):
255         super(TestVxlanGbp, self).tearDown()
256         if not self.vpp_dead:
257             self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
258             self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
259             self.logger.info(self.vapi.cli("show vxlan-gbp tunnel"))
260             self.logger.info(self.vapi.cli("show error"))
261
262
263 if __name__ == '__main__':
264     unittest.main(testRunner=VppTestRunner)