tests: replace pycodestyle with black
[vpp.git] / test / test_l2_fib.py
1 #!/usr/bin/env python3
2 """L2 FIB Test Case HLD:
3
4 **config 1**
5     - add 4 pg-l2 interfaces
6     - configure them into l2bd
7     - configure 100 MAC entries in L2 fib - 25 MACs per interface
8     - L2 MAC learning and unknown unicast flooding disabled in l2bd
9     - configure 100 MAC entries in L2 fib - 25 MACs per interface
10
11 **test 1**
12     - send L2 MAC frames between all 4 pg-l2 interfaces for all of 100 MAC \
13     entries in the FIB
14
15 **verify 1**
16     - all packets received correctly
17
18 **config 2**
19     - delete 12 MAC entries - 3 MACs per interface
20
21 **test 2a**
22     - send L2 MAC frames between all 4 pg-l2 interfaces for non-deleted MAC \
23     entries
24
25 **verify 2a**
26     - all packets received correctly
27
28 **test 2b**
29     - send L2 MAC frames between all 4 pg-l2 interfaces for all of 12 deleted \
30     MAC entries
31
32 **verify 2b**
33     - no packet received on all 4 pg-l2 interfaces
34
35 **config 3**
36     - configure new 100 MAC entries in L2 fib - 25 MACs per interface
37
38 **test 3**
39     - send L2 MAC frames between all 4 pg-l2 interfaces for all of 188 MAC \
40     entries in the FIB
41
42 **verify 3**
43     - all packets received correctly
44
45 **config 4**
46     - delete 160 MAC entries, 40 MACs per interface
47
48 **test 4a**
49     - send L2 MAC frames between all 4 pg-l2 interfaces for all of 28 \
50     non-deleted MAC entries
51
52 **verify 4a**
53     - all packets received correctly
54
55 **test 4b**
56     - try send L2 MAC frames between all 4 pg-l2 interfaces for all of 172 \
57     deleted MAC entries
58
59 **verify 4b**
60     - no packet received on all 4 pg-l2 interfaces
61 """
62
63 import unittest
64 import random
65
66 from scapy.packet import Raw
67 from scapy.layers.l2 import Ether
68 from scapy.layers.inet import IP, UDP
69
70 from framework import VppTestCase, VppTestRunner
71 from util import Host, ppp
72 from vpp_papi import mac_pton, VppEnum
73
74
75 class TestL2fib(VppTestCase):
76     """L2 FIB Test Case"""
77
78     @classmethod
79     def bd_ifs(cls, bd_id):
80         return range((bd_id - 1) * cls.n_ifs_per_bd, bd_id * cls.n_ifs_per_bd - 1)
81
82     @classmethod
83     def setUpClass(cls):
84         """
85         Perform standard class setup (defined by class method setUpClass in
86         class VppTestCase) before running the test case, set test case related
87         variables and configure VPP.
88
89         :var int bd_id: Bridge domain ID.
90         """
91         super(TestL2fib, cls).setUpClass()
92
93         try:
94             n_brs = cls.n_brs = range(1, 3)
95             cls.n_ifs_per_bd = 4
96             n_ifs = range(cls.n_ifs_per_bd * len(cls.n_brs))
97             # Create pg interfaces
98             cls.create_pg_interfaces(n_ifs)
99
100             cls.flows = dict()
101             for bd_id in n_brs:
102                 # Packet flows mapping pg0 -> pg1, pg2, pg3 etc.
103                 ifs = cls.bd_ifs(bd_id)
104                 for j in ifs:
105                     cls.flows[cls.pg_interfaces[j]] = [
106                         cls.pg_interfaces[x] for x in ifs if x != j
107                     ]
108
109             # Packet sizes
110             cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
111
112             for bd_id in n_brs:
113                 # Create BD with MAC learning and unknown unicast flooding
114                 # disabled and put interfaces to this BD
115                 cls.vapi.bridge_domain_add_del(bd_id=bd_id, uu_flood=0, learn=0)
116                 ifs = [cls.pg_interfaces[i] for i in cls.bd_ifs(bd_id)]
117                 for pg_if in ifs:
118                     cls.vapi.sw_interface_set_l2_bridge(
119                         rx_sw_if_index=pg_if.sw_if_index, bd_id=bd_id
120                     )
121
122             # Set up all interfaces
123             for i in cls.pg_interfaces:
124                 i.admin_up()
125         except Exception:
126             super(TestL2fib, cls).tearDownClass()
127             raise
128
129     @classmethod
130     def tearDownClass(cls):
131         super(TestL2fib, cls).tearDownClass()
132
133     def setUp(self):
134         super(TestL2fib, self).setUp()
135         self.reset_packet_infos()
136
137     def tearDown(self):
138         """
139         Show various debug prints after each test.
140         """
141         super(TestL2fib, self).tearDown()
142         if not self.vpp_dead:
143             for bd_id in self.n_brs:
144                 self.logger.info(
145                     self.vapi.ppcli("show bridge-domain %s detail" % bd_id)
146                 )
147
148     def show_commands_at_teardown(self):
149         self.logger.info(self.vapi.ppcli("show l2fib verbose"))
150
151     def create_hosts(self, n_hosts_per_if, subnet):
152         """
153         Create required number of host MAC addresses and distribute them among
154         interfaces. Create host IPv4 address for every host MAC address.
155
156         :param int n_hosts_per_if: Number of per interface hosts to
157             create MAC/IPv4 addresses for.
158         """
159
160         hosts = dict()
161         for pg_if in self.pg_interfaces:
162             swif = pg_if.sw_if_index
163
164             def mac(j):
165                 return "00:00:%02x:ff:%02x:%02x" % (subnet, swif, j)
166
167             def ip(j):
168                 return "172.%02u.1%02x.%u" % (subnet, swif, j)
169
170             def h(j):
171                 return Host(mac(j), ip(j))
172
173             hosts[swif] = [h(j) for j in range(n_hosts_per_if)]
174         return hosts
175
176     def split_hosts(self, hosts, n):
177         splits = dict()
178         for pg_if in self.pg_interfaces:
179             swif = pg_if.sw_if_index
180             splits[swif] = hosts[swif][:n]
181             hosts[swif] = hosts[swif][n:]
182         return splits
183
184     def learn_hosts(self, bd_id, hosts):
185         """
186         Create and send per interface L2 MAC broadcast packet stream to
187         let the bridge domain learn these MAC addresses.
188
189         :param int bd_id: BD to teach
190         :param dict hosts: dict of hosts per interface
191         """
192         self.vapi.bridge_flags(bd_id=bd_id, is_set=1, flags=1)
193         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
194         for pg_if in ifs:
195             swif = pg_if.sw_if_index
196             packets = [
197                 Ether(dst="ff:ff:ff:ff:ff:ff", src=host.mac) for host in hosts[swif]
198             ]
199             pg_if.add_stream(packets)
200         self.logger.info("Sending broadcast eth frames for MAC learning")
201         self.pg_start()
202
203     def config_l2_fib_entries(self, bd_id, hosts):
204         """
205         Config required number of L2 FIB entries.
206
207         :param int bd_id: BD's id
208         :param int count: Number of L2 FIB entries to be created.
209         :param int start: Starting index of the host list. (Default value = 0)
210         """
211         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
212         for pg_if in ifs:
213             swif = pg_if.sw_if_index
214             for host in hosts[swif]:
215                 self.vapi.l2fib_add_del(mac_pton(host.mac), bd_id, swif, static_mac=1)
216
217     def delete_l2_fib_entry(self, bd_id, hosts):
218         """
219         Delete required number of L2 FIB entries.
220
221         :param int count: Number of L2 FIB entries to be created.
222         """
223         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
224         for pg_if in ifs:
225             swif = pg_if.sw_if_index
226             for host in hosts[swif]:
227                 self.vapi.l2fib_add_del(mac_pton(host.mac), bd_id, swif, is_add=0)
228
229     def flush_int(self, swif, learned_hosts):
230         """
231         Flush swif L2 FIB entries.
232
233         :param int swif: sw if index.
234         """
235         flushed = dict()
236         self.vapi.l2fib_flush_int(swif)
237         flushed[swif] = learned_hosts[swif]
238         learned_hosts[swif] = []
239         return flushed
240
241     def flush_bd(self, bd_id, learned_hosts):
242         """
243         Flush bd_id L2 FIB entries.
244
245         :param int bd_id: Bridge Domain id.
246         """
247         self.vapi.l2fib_flush_bd(bd_id)
248         flushed = dict()
249         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
250         for pg_if in ifs:
251             swif = pg_if.sw_if_index
252             flushed[swif] = learned_hosts[swif]
253             learned_hosts[swif] = []
254         return flushed
255
256     def flush_all(self):
257         """
258         Flush All L2 FIB entries.
259         """
260         self.vapi.l2fib_flush_all()
261
262     def create_stream(self, src_if, packet_sizes, if_src_hosts, if_dst_hosts):
263         """
264         Create input packet stream for defined interface using hosts or
265         deleted_hosts list.
266
267         :param object src_if: Interface to create packet stream for.
268         :param list packet_sizes: List of required packet sizes.
269         :param boolean deleted: Set to True if deleted_hosts list required.
270         :return: Stream of packets.
271         """
272         src_hosts = if_src_hosts[src_if.sw_if_index]
273         if not src_hosts:
274             return []
275         pkts = []
276         for dst_if in self.flows[src_if]:
277             dst_swif = dst_if.sw_if_index
278             if dst_swif not in if_dst_hosts:
279                 continue
280             dst_hosts = if_dst_hosts[dst_swif]
281             for dst_host in dst_hosts:
282                 src_host = random.choice(src_hosts)
283                 pkt_info = self.create_packet_info(src_if, dst_if)
284                 payload = self.info_to_payload(pkt_info)
285                 p = (
286                     Ether(dst=dst_host.mac, src=src_host.mac)
287                     / IP(src=src_host.ip4, dst=dst_host.ip4)
288                     / UDP(sport=1234, dport=1234)
289                     / Raw(payload)
290                 )
291                 pkt_info.data = p.copy()
292                 size = random.choice(packet_sizes)
293                 self.extend_packet(p, size)
294                 pkts.append(p)
295         return pkts
296
297     def verify_capture(self, pg_if, capture):
298         """
299         Verify captured input packet stream for defined interface.
300
301         :param object pg_if: Interface to verify captured packet stream for.
302         :param list capture: Captured packet stream.
303         """
304         last_info = dict()
305         for i in self.pg_interfaces:
306             last_info[i.sw_if_index] = None
307         dst_sw_if_index = pg_if.sw_if_index
308         for packet in capture:
309             payload_info = self.payload_to_info(packet[Raw])
310             try:
311                 ip = packet[IP]
312                 udp = packet[UDP]
313                 packet_index = payload_info.index
314                 self.assertEqual(payload_info.dst, dst_sw_if_index)
315                 self.logger.debug(
316                     "Got packet on port %s: src=%u (id=%u)"
317                     % (pg_if.name, payload_info.src, packet_index)
318                 )
319                 next_info = self.get_next_packet_info_for_interface2(
320                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
321                 )
322                 last_info[payload_info.src] = next_info
323                 self.assertTrue(next_info is not None)
324                 self.assertEqual(packet_index, next_info.index)
325                 saved_packet = next_info.data
326                 # Check standard fields
327                 self.assertEqual(ip.src, saved_packet[IP].src)
328                 self.assertEqual(ip.dst, saved_packet[IP].dst)
329                 self.assertEqual(udp.sport, saved_packet[UDP].sport)
330                 self.assertEqual(udp.dport, saved_packet[UDP].dport)
331             except BaseException:
332                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
333                 raise
334         for i in self.pg_interfaces:
335             remaining_packet = self.get_next_packet_info_for_interface2(
336                 i, dst_sw_if_index, last_info[i.sw_if_index]
337             )
338             self.assertTrue(
339                 remaining_packet is None,
340                 "Port %u: Packet expected from source %u didn't arrive"
341                 % (dst_sw_if_index, i.sw_if_index),
342             )
343
344     def run_verify_test(self, bd_id, src_hosts, dst_hosts):
345         # Test
346         # Create incoming packet streams for packet-generator interfaces
347         self.reset_packet_infos()
348         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
349         for i in ifs:
350             pkts = self.create_stream(
351                 i,
352                 self.pg_if_packet_sizes,
353                 if_src_hosts=src_hosts,
354                 if_dst_hosts=dst_hosts,
355             )
356             if pkts:
357                 i.add_stream(pkts)
358
359         self.vapi.bridge_flags(bd_id=bd_id, is_set=0, flags=1)
360         # Enable packet capture and start packet sending
361         self.pg_enable_capture(ifs)
362         self.pg_start()
363
364         # Verify
365         # Verify outgoing packet streams per packet-generator interface
366         for i in ifs:
367             if not dst_hosts[i.sw_if_index]:
368                 continue
369             capture = i.get_capture()
370             self.logger.info("Verifying capture on interface %s" % i.name)
371             self.verify_capture(i, capture)
372
373     def run_verify_negat_test(self, bd_id, src_hosts, dst_hosts):
374         # Test
375         # Create incoming packet streams for packet-generator interfaces for
376         # deleted MAC addresses
377         self.reset_packet_infos()
378         ifs = [self.pg_interfaces[i] for i in self.bd_ifs(bd_id)]
379         for i in ifs:
380             pkts = self.create_stream(
381                 i,
382                 self.pg_if_packet_sizes,
383                 if_src_hosts=src_hosts,
384                 if_dst_hosts=dst_hosts,
385             )
386             if pkts:
387                 i.add_stream(pkts)
388
389         self.vapi.bridge_flags(bd_id=bd_id, is_set=0, flags=1)
390         # Enable packet capture and start packet sending
391         self.pg_enable_capture(ifs)
392         self.pg_start()
393
394         # Verify
395         # Verify outgoing packet streams per packet-generator interface
396         timeout = 1
397         for i in ifs:
398             i.get_capture(0, timeout=timeout)
399             i.assert_nothing_captured(remark="outgoing interface")
400             timeout = 0.1
401
402     def test_l2_fib_program100(self):
403         """L2 FIB - program 100 MACs"""
404         bd_id = 1
405         hosts = self.create_hosts(100, subnet=17)
406         self.config_l2_fib_entries(bd_id, hosts)
407         self.run_verify_test(bd_id, hosts, hosts)
408
409     def test_l2_fib_program100_delete12(self):
410         """L2 FIB - program 100, delete 12 MACs"""
411         bd_id = 1
412         hosts = self.create_hosts(100, subnet=17)
413         self.config_l2_fib_entries(bd_id, hosts)
414         del_hosts = self.split_hosts(hosts, 12)
415         self.delete_l2_fib_entry(bd_id, del_hosts)
416
417         self.run_verify_test(bd_id, hosts, hosts)
418         self.run_verify_negat_test(bd_id, hosts, del_hosts)
419
420     def test_l2_fib_program100_add100(self):
421         """L2 FIB - program 100, add 100 MACs"""
422         bd_id = 1
423         hosts = self.create_hosts(100, subnet=17)
424         self.config_l2_fib_entries(bd_id, hosts)
425         hosts2 = self.create_hosts(100, subnet=22)
426         self.config_l2_fib_entries(bd_id, hosts2)
427         self.run_verify_test(bd_id, hosts, hosts2)
428
429     def test_l2_fib_program10_learn10(self):
430         """L2 FIB - program 10 MACs, learn 10"""
431         hosts = self.create_hosts(20, subnet=35)
432         lhosts = self.split_hosts(hosts, 10)
433
434         bd1 = 1
435         bd2 = 2
436         self.learn_hosts(bd1, lhosts)
437         self.learn_hosts(bd2, lhosts)
438         self.config_l2_fib_entries(bd1, hosts)
439         self.config_l2_fib_entries(bd2, hosts)
440         self.run_verify_test(bd1, lhosts, hosts)
441         self.run_verify_test(bd2, lhosts, hosts)
442
443     def test_l2_fib_flush_int(self):
444         """L2 FIB - flush interface"""
445         hosts = self.create_hosts(20, subnet=36)
446         lhosts = self.split_hosts(hosts, 10)
447
448         bd1 = 1
449         self.learn_hosts(bd1, lhosts)
450         self.config_l2_fib_entries(bd1, hosts)
451         self.run_verify_test(bd1, lhosts, hosts)
452         flushed = self.flush_int(self.pg_interfaces[0].sw_if_index, lhosts)
453         self.run_verify_test(bd1, hosts, lhosts)
454         self.run_verify_negat_test(bd1, hosts, flushed)
455
456     def test_l2_fib_flush_bd(self):
457         """L2 FIB - flush BD"""
458         hosts = self.create_hosts(20, subnet=37)
459         lhosts = self.split_hosts(hosts, 10)
460
461         bd1 = 1
462         self.learn_hosts(bd1, lhosts)
463         self.config_l2_fib_entries(bd1, hosts)
464         self.run_verify_test(bd1, lhosts, hosts)
465         flushed = self.flush_bd(bd1, lhosts)
466         self.run_verify_negat_test(bd1, hosts, flushed)
467
468     def test_l2_fib_flush_all(self):
469         """L2 FIB - flush all"""
470         hosts = self.create_hosts(20, subnet=38)
471         lhosts = self.split_hosts(hosts, 10)
472
473         bd1 = 1
474         bd2 = 2
475         self.learn_hosts(bd1, lhosts)
476         self.learn_hosts(bd2, lhosts)
477         self.config_l2_fib_entries(bd1, hosts)
478         self.config_l2_fib_entries(bd2, hosts)
479         self.run_verify_test(bd1, hosts, lhosts)
480         self.run_verify_test(bd2, hosts, lhosts)
481
482         self.flush_all()
483
484         self.run_verify_negat_test(bd1, hosts, lhosts)
485         self.run_verify_negat_test(bd2, hosts, lhosts)
486
487     def test_l2_fib_mac_learn_evs(self):
488         """L2 FIB - mac learning events"""
489         bd1 = 1
490         hosts = self.create_hosts(10, subnet=39)
491
492         self.vapi.want_l2_macs_events()
493         self.learn_hosts(bd1, hosts)
494
495         self.virtual_sleep(1)
496         self.logger.info(self.vapi.ppcli("show l2fib"))
497         evs = self.vapi.collect_events()
498         action = VppEnum.vl_api_mac_event_action_t.MAC_EVENT_ACTION_API_ADD
499         learned_macs = {
500             e.mac[i].mac_addr.packed
501             for e in evs
502             for i in range(e.n_macs)
503             if e.mac[i].action == action
504         }
505         macs = {
506             h.bin_mac
507             for swif in self.bd_ifs(bd1)
508             for h in hosts[self.pg_interfaces[swif].sw_if_index]
509         }
510         self.vapi.want_l2_macs_events(enable_disable=0)
511         self.assertEqual(len(learned_macs ^ macs), 0)
512
513     def test_l2_fib_mac_learn_evs2(self):
514         """L2 FIB - mac learning events using want_l2_macs_events2"""
515         bd1 = 1
516         hosts = self.create_hosts(10, subnet=39)
517
518         self.vapi.l2fib_set_scan_delay(scan_delay=10)
519         self.vapi.want_l2_macs_events2()
520         self.sleep(1)
521         self.learn_hosts(bd1, hosts)
522
523         self.virtual_sleep(1)
524         self.logger.info(self.vapi.ppcli("show l2fib"))
525         evs = self.vapi.collect_events()
526         action = VppEnum.vl_api_mac_event_action_t.MAC_EVENT_ACTION_API_ADD
527         learned_macs = {
528             e.mac[i].mac_addr.packed
529             for e in evs
530             for i in range(e.n_macs)
531             if e.mac[i].action == action
532         }
533         macs = {
534             h.bin_mac
535             for swif in self.bd_ifs(bd1)
536             for h in hosts[self.pg_interfaces[swif].sw_if_index]
537         }
538         self.vapi.want_l2_macs_events2(enable_disable=0)
539         self.assertEqual(len(learned_macs ^ macs), 0)
540
541     def test_l2_fib_macs_learn_max(self):
542         """L2 FIB - mac learning max macs in event"""
543         bd1 = 1
544         hosts = self.create_hosts(10, subnet=40)
545
546         ev_macs = 1
547         self.vapi.want_l2_macs_events(max_macs_in_event=ev_macs)
548         self.learn_hosts(bd1, hosts)
549
550         self.sleep(1)
551         self.logger.info(self.vapi.ppcli("show l2fib"))
552         evs = self.vapi.collect_events()
553         self.vapi.want_l2_macs_events(enable_disable=0)
554
555         self.assertGreater(len(evs), 0)
556         action = VppEnum.vl_api_mac_event_action_t.MAC_EVENT_ACTION_API_ADD
557         learned_macs = {
558             e.mac[i].mac_addr.packed
559             for e in evs
560             for i in range(e.n_macs)
561             if e.mac[i].action == action
562         }
563         macs = {
564             h.bin_mac
565             for swif in self.bd_ifs(bd1)
566             for h in hosts[self.pg_interfaces[swif].sw_if_index]
567         }
568
569         for e in evs:
570             self.assertLess(len(e), ev_macs * 10)
571         self.assertEqual(len(learned_macs ^ macs), 0)
572
573     def test_l2_fib_macs_learn_max2(self):
574         """L2 FIB - mac learning max macs in event using want_l2_macs_events2"""
575         bd1 = 1
576         hosts = self.create_hosts(10, subnet=40)
577
578         ev_macs = 1
579         self.vapi.l2fib_set_scan_delay(scan_delay=10)
580         self.vapi.want_l2_macs_events2(max_macs_in_event=ev_macs)
581         self.sleep(1)
582         self.learn_hosts(bd1, hosts)
583
584         self.virtual_sleep(1)
585         self.logger.info(self.vapi.ppcli("show l2fib"))
586         evs = self.vapi.collect_events()
587         self.vapi.want_l2_macs_events2(enable_disable=0)
588
589         self.assertGreater(len(evs), 0)
590         action = VppEnum.vl_api_mac_event_action_t.MAC_EVENT_ACTION_API_ADD
591         learned_macs = {
592             e.mac[i].mac_addr.packed
593             for e in evs
594             for i in range(e.n_macs)
595             if e.mac[i].action == action
596         }
597         macs = {
598             h.bin_mac
599             for swif in self.bd_ifs(bd1)
600             for h in hosts[self.pg_interfaces[swif].sw_if_index]
601         }
602
603         for e in evs:
604             self.assertLess(len(e), ev_macs * 10)
605         self.assertEqual(len(learned_macs ^ macs), 0)
606
607
608 if __name__ == "__main__":
609     unittest.main(testRunner=VppTestRunner)