4 from util import ip4n_range, ip4_range
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
9 from scapy.layers.l2 import Ether, Raw
10 from scapy.layers.inet import IP, UDP
11 from scapy.layers.vxlan import VXLAN
12 from scapy.utils import atol
17 def reassemble(listoffragments):
18 buffer = StringIO.StringIO()
19 first = listoffragments[0]
21 for pkt in listoffragments:
22 buffer.seek(pkt[IP].frag*8)
23 buffer.write(pkt[IP].payload)
24 first.len = len(buffer.getvalue()) + 20
27 header = str(first[Ether])[:34]
28 return first[Ether].__class__(header + buffer.getvalue())
31 class TestVxlanGbp(VppTestCase):
32 """ VXLAN GBP Test Case """
35 def frame_request(self):
36 """ Ethernet frame modeling a generic request """
37 return (Ether(src='00:00:00:00:00:01', dst='00:00:00:00:00:02') /
38 IP(src='1.2.3.4', dst='4.3.2.1') /
39 UDP(sport=10000, dport=20000) /
43 def frame_reply(self):
44 """ Ethernet frame modeling a generic reply """
45 return (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
46 IP(src='4.3.2.1', dst='1.2.3.4') /
47 UDP(sport=20000, dport=10000) /
50 def encapsulate(self, pkt, vni):
52 Encapsulate the original payload frame by adding VXLAN GBP header with
53 its UDP, IP and Ethernet fields
55 return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
56 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
57 UDP(sport=self.dport, dport=self.dport, chksum=0) /
58 VXLAN(vni=vni, flags=self.flags, gpflags=self.gpflags,
59 gpid=self.sclass) / pkt)
61 def ip_range(self, start, end):
62 """ range of remote ip's """
63 return ip4_range(self.pg0.remote_ip4, start, end)
65 def decapsulate(self, pkt):
67 Decapsulate the original payload frame by removing VXLAN header
69 # check if is set G and I flag
70 self.assertEqual(pkt[VXLAN].flags, int('0x88', 16))
71 return pkt[VXLAN].payload
73 # Method for checking VXLAN GBP encapsulation.
75 def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
76 # TODO: add error messages
77 # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
79 self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
82 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
84 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
85 # Verify VXLAN GBP tunnel source IP is VPP_IP and destination IP is
87 self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
90 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
92 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
93 # Verify UDP destination port is VXLAN GBP 48879, source UDP port could
95 self.assertEqual(pkt[UDP].dport, type(self).dport)
96 # TODO: checksum check
99 self.assertEqual(pkt[VXLAN].vni, vni)
100 # Verify Source Class
101 self.assertEqual(pkt[VXLAN].gpid, 0)
104 def create_vxlan_gbp_flood_test_bd(cls, vni, n_ucast_tunnels):
105 # Create 2 ucast vxlan tunnels under bd
107 ip_range_end = ip_range_start + n_ucast_tunnels
108 next_hop_address = cls.pg0.remote_ip4n
109 for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
111 # add host route so dest_ip4n will not be resolved
112 cls.vapi.ip_add_del_route(dest_ip4n, 32, next_hop_address)
113 r = cls.vapi.vxlan_gbp_add_del_tunnel(
114 src_addr=cls.pg0.local_ip4n,
117 cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
119 # Class method to start the VXLAN GBP test case.
120 # Overrides setUpClass method in VppTestCase class.
121 # Python try..except statement is used to ensure that the tear down of
122 # the class will be executed even if exception is raised.
123 # @param cls The class pointer.
126 super(TestVxlanGbp, cls).setUpClass()
134 # Create 2 pg interfaces.
135 cls.create_pg_interfaces(range(4))
136 for pg in cls.pg_interfaces:
139 # Configure IPv4 addresses on VPP pg0.
142 # Resolve MAC address for VPP's IP address on pg0.
143 cls.pg0.resolve_arp()
145 # Create VXLAN GBP VTEP on VPP pg0, and put vxlan_gbp_tunnel0 and
147 cls.single_tunnel_bd = 1
148 r = cls.vapi.vxlan_gbp_add_del_tunnel(
149 src_addr=cls.pg0.local_ip4n,
150 dst_addr=cls.pg0.remote_ip4n,
151 vni=cls.single_tunnel_bd)
152 cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
153 bd_id=cls.single_tunnel_bd)
154 cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
155 bd_id=cls.single_tunnel_bd)
157 # Setup vni 2 to test multicast flooding
158 cls.n_ucast_tunnels = 2
159 # Setup vni 3 to test unicast flooding
160 cls.ucast_flood_bd = 3
161 cls.create_vxlan_gbp_flood_test_bd(cls.ucast_flood_bd,
163 cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
164 bd_id=cls.ucast_flood_bd)
166 super(TestVxlanGbp, cls).tearDownClass()
169 def assert_eq_pkts(self, pkt1, pkt2):
170 """ Verify the Ether, IP, UDP, payload are equal in both
173 self.assertEqual(pkt1[Ether].src, pkt2[Ether].src)
174 self.assertEqual(pkt1[Ether].dst, pkt2[Ether].dst)
175 self.assertEqual(pkt1[IP].src, pkt2[IP].src)
176 self.assertEqual(pkt1[IP].dst, pkt2[IP].dst)
177 self.assertEqual(pkt1[UDP].sport, pkt2[UDP].sport)
178 self.assertEqual(pkt1[UDP].dport, pkt2[UDP].dport)
179 self.assertEqual(pkt1[Raw], pkt2[Raw])
181 def test_decap(self):
182 """ Decapsulation test
183 Send encapsulated frames from pg0
184 Verify receipt of decapsulated frames on pg1
186 encapsulated_pkt = self.encapsulate(self.frame_request,
187 self.single_tunnel_bd)
189 self.pg0.add_stream([encapsulated_pkt, ])
191 self.pg1.enable_capture()
195 # Pick first received frame and check if it's the non-encapsulated
197 out = self.pg1.get_capture(1)
199 self.assert_eq_pkts(pkt, self.frame_request)
201 def test_encap(self):
202 """ Encapsulation test
204 Verify receipt of encapsulated frames on pg0
206 self.pg1.add_stream([self.frame_reply])
208 self.pg0.enable_capture()
212 # Pick first received frame and check if it's corectly encapsulated.
213 out = self.pg0.get_capture(1)
215 self.check_encapsulation(pkt, self.single_tunnel_bd)
217 payload = self.decapsulate(pkt)
218 self.assert_eq_pkts(payload, self.frame_reply)
220 def test_ucast_flood(self):
221 """ Unicast flood test
223 Verify receipt of encapsulated frames on pg0
225 self.pg3.add_stream([self.frame_reply])
227 self.pg0.enable_capture()
231 # Get packet from each tunnel and assert it's corectly encapsulated.
232 out = self.pg0.get_capture(self.n_ucast_tunnels)
234 self.check_encapsulation(pkt, self.ucast_flood_bd, True)
235 payload = self.decapsulate(pkt)
236 self.assert_eq_pkts(payload, self.frame_reply)
238 def test_encap_big_packet(self):
239 """ Encapsulation test send big frame from pg1
240 Verify receipt of encapsulated frames on pg0
243 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
245 frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
246 IP(src='4.3.2.1', dst='1.2.3.4') /
247 UDP(sport=20000, dport=10000) /
250 self.pg1.add_stream([frame])
252 self.pg0.enable_capture()
256 # Pick first received frame and check if it's correctly encapsulated.
257 out = self.pg0.get_capture(2)
258 pkt = reassemble(out)
259 self.check_encapsulation(pkt, self.single_tunnel_bd)
261 payload = self.decapsulate(pkt)
262 self.assert_eq_pkts(payload, frame)
264 # Method to define VPP actions before tear down of the test case.
265 # Overrides tearDown method in VppTestCase class.
266 # @param self The object pointer.
268 super(TestVxlanGbp, self).tearDown()
269 if not self.vpp_dead:
270 self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
271 self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
272 self.logger.info(self.vapi.cli("show vxlan-gbp tunnel"))
273 self.logger.info(self.vapi.cli("show error"))
276 if __name__ == '__main__':
277 unittest.main(testRunner=VppTestRunner)