vlib: fix use of RTLD_DEEPBIND for musl
[vpp.git] / test / test_span.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from scapy.packet import Raw
6 from scapy.layers.l2 import Ether, GRE, ERSPAN
7 from scapy.layers.inet import IP, UDP
8 from scapy.layers.vxlan import VXLAN
9
10 from framework import VppTestCase
11 from asfframework import VppTestRunner
12 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint, VppDot1ADSubint
13 from vpp_gre_interface import VppGreInterface
14 from vpp_vxlan_tunnel import VppVxlanTunnel
15 from collections import namedtuple
16 from vpp_papi import VppEnum
17
18
19 Tag = namedtuple("Tag", ["dot1", "vlan"])
20 DOT1AD = 0x88A8
21 DOT1Q = 0x8100
22
23
24 class TestSpan(VppTestCase):
25     """SPAN Test Case"""
26
27     @classmethod
28     def setUpClass(cls):
29         super(TestSpan, cls).setUpClass()
30         # Test variables
31         cls.pkts_per_burst = 257  # Number of packets per burst
32         # create 3 pg interfaces
33         cls.create_pg_interfaces(range(3))
34
35         cls.bd_id = 55
36         cls.sub_if = VppDot1QSubint(cls, cls.pg0, 100)
37         cls.vlan_sub_if = VppDot1QSubint(cls, cls.pg2, 300)
38         cls.vlan_sub_if.set_vtr(L2_VTR_OP.L2_POP_1, tag=300)
39
40         cls.qinq_sub_if = VppDot1ADSubint(cls, cls.pg2, 33, 400, 500)
41         cls.qinq_sub_if.set_vtr(L2_VTR_OP.L2_POP_2, outer=500, inner=400)
42
43         # packet flows mapping pg0 -> pg1, pg2 -> pg3, etc.
44         cls.flows = dict()
45         cls.flows[cls.pg0] = [cls.pg1]
46         cls.flows[cls.pg1] = [cls.pg0]
47
48         # packet sizes
49         cls.pg_if_packet_sizes = [64, 512, 1518]  # , 9018]
50
51         # setup all interfaces
52         for i in cls.pg_interfaces:
53             i.admin_up()
54             i.config_ip4()
55             i.resolve_arp()
56
57     def setUp(self):
58         super(TestSpan, self).setUp()
59         self.vxlan = VppVxlanTunnel(
60             self, src=self.pg2.local_ip4, dst=self.pg2.remote_ip4, vni=1111
61         )
62         self.vxlan.add_vpp_config()
63         self.reset_packet_infos()
64
65     def tearDown(self):
66         super(TestSpan, self).tearDown()
67
68     def show_commands_at_teardown(self):
69         self.logger.info(self.vapi.ppcli("show interface span"))
70
71     def xconnect(self, a, b, is_add=1):
72         self.vapi.sw_interface_set_l2_xconnect(a, b, enable=is_add)
73         self.vapi.sw_interface_set_l2_xconnect(b, a, enable=is_add)
74
75     def bridge(self, sw_if_index, is_add=1):
76         self.vapi.sw_interface_set_l2_bridge(
77             rx_sw_if_index=sw_if_index, bd_id=self.bd_id, enable=is_add
78         )
79
80     def _remove_tag(self, packet, vlan, tag_type):
81         self.assertEqual(packet.type, tag_type)
82         payload = packet.payload
83         self.assertEqual(payload.vlan, vlan)
84         inner_type = payload.type
85         payload = payload.payload
86         packet.remove_payload()
87         packet.add_payload(payload)
88         packet.type = inner_type
89
90     def remove_tags(self, packet, tags):
91         for t in tags:
92             self._remove_tag(packet, t.vlan, t.dot1)
93         return packet
94
95     def decap_gre(self, pkt):
96         """
97         Decapsulate the original payload frame by removing GRE header
98         """
99         self.assertEqual(pkt[Ether].src, self.pg2.local_mac)
100         self.assertEqual(pkt[Ether].dst, self.pg2.remote_mac)
101
102         self.assertEqual(pkt[IP].src, self.pg2.local_ip4)
103         self.assertEqual(pkt[IP].dst, self.pg2.remote_ip4)
104
105         return pkt[GRE].payload
106
107     def decap_erspan(self, pkt, session):
108         """
109         Decapsulate the original payload frame by removing ERSPAN header
110         """
111         self.assertEqual(pkt[Ether].src, self.pg2.local_mac)
112         self.assertEqual(pkt[Ether].dst, self.pg2.remote_mac)
113
114         self.assertEqual(pkt[IP].src, self.pg2.local_ip4)
115         self.assertEqual(pkt[IP].dst, self.pg2.remote_ip4)
116
117         self.assertEqual(pkt[ERSPAN].ver, 1)
118         self.assertEqual(pkt[ERSPAN].vlan, 0)
119         self.assertEqual(pkt[ERSPAN].cos, 0)
120         self.assertEqual(pkt[ERSPAN].en, 3)
121         self.assertEqual(pkt[ERSPAN].t, 0)
122         self.assertEqual(pkt[ERSPAN].session_id, session)
123         self.assertEqual(pkt[ERSPAN].reserved, 0)
124         self.assertEqual(pkt[ERSPAN].index, 0)
125
126         return pkt[ERSPAN].payload
127
128     def decap_vxlan(self, pkt):
129         """
130         Decapsulate the original payload frame by removing VXLAN header
131         """
132         self.assertEqual(pkt[Ether].src, self.pg2.local_mac)
133         self.assertEqual(pkt[Ether].dst, self.pg2.remote_mac)
134
135         self.assertEqual(pkt[IP].src, self.pg2.local_ip4)
136         self.assertEqual(pkt[IP].dst, self.pg2.remote_ip4)
137
138         return pkt[VXLAN].payload
139
140     def create_stream(self, src_if, packet_sizes, do_dot1=False, bcast=False):
141         pkts = []
142         dst_if = self.flows[src_if][0]
143         dst_mac = src_if.remote_mac
144         if bcast:
145             dst_mac = "ff:ff:ff:ff:ff:ff"
146
147         for i in range(0, self.pkts_per_burst):
148             payload = "span test"
149             size = packet_sizes[int((i / 2) % len(packet_sizes))]
150             p = (
151                 Ether(src=src_if.local_mac, dst=dst_mac)
152                 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
153                 / UDP(sport=10000 + src_if.sw_if_index * 1000 + i, dport=1234)
154                 / Raw(payload)
155             )
156             if do_dot1:
157                 p = self.sub_if.add_dot1_layer(p)
158             self.extend_packet(p, size)
159             pkts.append(p)
160         return pkts
161
162     def verify_capture(self, cap1, cap2):
163         self.assertEqual(
164             len(cap1),
165             len(cap2),
166             "Different number of sent and mirrored packets :"
167             "%u != %u" % (len(cap1), len(cap2)),
168         )
169
170         pkts1 = [(pkt[Ether] / pkt[IP] / pkt[UDP]) for pkt in cap1]
171         pkts2 = [(pkt[Ether] / pkt[IP] / pkt[UDP]) for pkt in cap2]
172
173         self.assertEqual(pkts1.sort(), pkts2.sort())
174
175     def test_device_span(self):
176         """SPAN device rx mirror"""
177
178         # Create bi-directional cross-connects between pg0 and pg1
179         self.xconnect(self.pg0.sw_if_index, self.pg1.sw_if_index)
180         # Create incoming packet streams for packet-generator interfaces
181         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
182         self.pg0.add_stream(pkts)
183
184         # Enable SPAN on pg0 (mirrored to pg2)
185         self.vapi.sw_interface_span_enable_disable(
186             self.pg0.sw_if_index, self.pg2.sw_if_index
187         )
188
189         self.logger.info(self.vapi.ppcli("show interface span"))
190         # Enable packet capturing and start packet sending
191         self.pg_enable_capture(self.pg_interfaces)
192         self.pg_start()
193
194         # Verify packets outgoing packet streams on mirrored interface (pg2)
195         n_pkts = len(pkts)
196         pg1_pkts = self.pg1.get_capture(n_pkts)
197         pg2_pkts = self.pg2.get_capture(n_pkts)
198
199         # Disable SPAN on pg0 (mirrored to pg2)
200         self.vapi.sw_interface_span_enable_disable(
201             self.pg0.sw_if_index, self.pg2.sw_if_index, state=0
202         )
203         self.xconnect(self.pg0.sw_if_index, self.pg1.sw_if_index, is_add=0)
204
205         self.verify_capture(pg1_pkts, pg2_pkts)
206
207     def test_span_l2_rx(self):
208         """SPAN l2 rx mirror"""
209
210         self.sub_if.admin_up()
211
212         self.bridge(self.pg2.sw_if_index)
213         # Create bi-directional cross-connects between pg0 subif and pg1
214         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index)
215         # Create incoming packet streams for packet-generator interfaces
216         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
217         self.pg0.add_stream(pkts)
218
219         # Enable SPAN on pg0 (mirrored to pg2)
220         self.vapi.sw_interface_span_enable_disable(
221             self.sub_if.sw_if_index, self.pg2.sw_if_index, is_l2=1
222         )
223
224         self.logger.info(self.vapi.ppcli("show interface span"))
225         # Enable packet capturing and start packet sending
226         self.pg_enable_capture(self.pg_interfaces)
227         self.pg_start()
228
229         # Verify packets outgoing packet streams on mirrored interface (pg2)
230         pg2_expected = len(pkts)
231         pg1_pkts = self.pg1.get_capture(pg2_expected)
232         pg2_pkts = self.pg2.get_capture(pg2_expected)
233         self.bridge(self.pg2.sw_if_index, is_add=0)
234
235         # Disable SPAN on pg0 (mirrored to pg2)
236         self.vapi.sw_interface_span_enable_disable(
237             self.sub_if.sw_if_index, self.pg2.sw_if_index, state=0, is_l2=1
238         )
239         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
240
241         self.verify_capture(pg1_pkts, pg2_pkts)
242
243     def test_span_l2_rx_dst_vxlan(self):
244         """SPAN l2 rx mirror into vxlan"""
245
246         self.sub_if.admin_up()
247         self.vapi.sw_interface_set_flags(self.vxlan.sw_if_index, flags=1)
248
249         self.bridge(self.vxlan.sw_if_index, is_add=1)
250         # Create bi-directional cross-connects between pg0 subif and pg1
251         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index)
252         # Create incoming packet streams for packet-generator interfaces
253         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
254         self.pg0.add_stream(pkts)
255
256         # Enable SPAN on pg0 sub if (mirrored to vxlan)
257         self.vapi.sw_interface_span_enable_disable(
258             self.sub_if.sw_if_index, self.vxlan.sw_if_index, is_l2=1
259         )
260
261         self.logger.info(self.vapi.ppcli("show interface span"))
262         # Enable packet capturing and start packet sending
263         self.pg_enable_capture(self.pg_interfaces)
264         self.pg_start()
265
266         # Verify packets outgoing packet streams on mirrored interface (pg2)
267         n_pkts = len(pkts)
268         pg1_pkts = self.pg1.get_capture(n_pkts)
269         pg2_pkts = [self.decap_vxlan(p) for p in self.pg2.get_capture(n_pkts)]
270
271         self.bridge(self.vxlan.sw_if_index, is_add=0)
272         # Disable SPAN on pg0 sub if (mirrored to vxlan)
273         self.vapi.sw_interface_span_enable_disable(
274             self.sub_if.sw_if_index, self.vxlan.sw_if_index, state=0, is_l2=1
275         )
276         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
277         self.verify_capture(pg1_pkts, pg2_pkts)
278
279     def test_span_l2_rx_dst_gre_erspan(self):
280         """SPAN l2 rx mirror into gre-erspan"""
281
282         self.sub_if.admin_up()
283
284         gre_if = VppGreInterface(
285             self,
286             self.pg2.local_ip4,
287             self.pg2.remote_ip4,
288             session=543,
289             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_ERSPAN),
290         )
291
292         gre_if.add_vpp_config()
293         gre_if.admin_up()
294
295         self.bridge(gre_if.sw_if_index)
296         # Create bi-directional cross-connects between pg0 and pg1
297         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=1)
298
299         # Create incoming packet streams for packet-generator interfaces
300         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
301         self.pg0.add_stream(pkts)
302
303         # Enable SPAN on pg0 sub if (mirrored to gre-erspan)
304         self.vapi.sw_interface_span_enable_disable(
305             self.sub_if.sw_if_index, gre_if.sw_if_index, is_l2=1
306         )
307
308         # Enable packet capturing and start packet sending
309         self.pg_enable_capture(self.pg_interfaces)
310         self.pg_start()
311
312         # Verify packets outgoing packet streams on mirrored interface (pg2)
313         n_pkts = len(pkts)
314         pg1_pkts = self.pg1.get_capture(n_pkts)
315         pg2_pkts = self.pg2.get_capture(n_pkts)
316
317         def decap(p):
318             return self.decap_erspan(p, session=543)
319
320         pg2_decaped = [decap(p) for p in pg2_pkts]
321
322         self.bridge(gre_if.sw_if_index, is_add=0)
323
324         # Disable SPAN on pg0 sub if
325         self.vapi.sw_interface_span_enable_disable(
326             self.sub_if.sw_if_index, gre_if.sw_if_index, state=0, is_l2=1
327         )
328         gre_if.remove_vpp_config()
329         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
330
331         self.verify_capture(pg1_pkts, pg2_decaped)
332
333     def test_span_l2_rx_dst_gre_subif_vtr(self):
334         """SPAN l2 rx mirror into gre-subif+vtr"""
335
336         self.sub_if.admin_up()
337
338         gre_if = VppGreInterface(
339             self,
340             self.pg2.local_ip4,
341             self.pg2.remote_ip4,
342             type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
343         )
344
345         gre_if.add_vpp_config()
346         gre_if.admin_up()
347
348         gre_sub_if = VppDot1QSubint(self, gre_if, 500)
349         gre_sub_if.set_vtr(L2_VTR_OP.L2_POP_1, tag=500)
350         gre_sub_if.admin_up()
351
352         self.bridge(gre_sub_if.sw_if_index)
353         # Create bi-directional cross-connects between pg0 and pg1
354         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=1)
355
356         # Create incoming packet streams for packet-generator interfaces
357         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
358         self.pg0.add_stream(pkts)
359
360         # Enable SPAN on pg0 sub if (mirrored to gre sub if)
361         self.vapi.sw_interface_span_enable_disable(
362             self.sub_if.sw_if_index, gre_sub_if.sw_if_index, is_l2=1
363         )
364
365         # Enable packet capturing and start packet sending
366         self.pg_enable_capture(self.pg_interfaces)
367         self.pg_start()
368
369         # Verify packets outgoing packet streams on mirrored interface (pg2)
370         n_pkts = len(pkts)
371         pg1_pkts = self.pg1.get_capture(n_pkts)
372         pg2_pkts = self.pg2.get_capture(n_pkts)
373
374         def decap(p):
375             return self.remove_tags(self.decap_gre(p), [Tag(dot1=DOT1Q, vlan=500)])
376
377         pg2_decaped = [decap(p) for p in pg2_pkts]
378
379         self.bridge(gre_sub_if.sw_if_index, is_add=0)
380
381         # Disable SPAN on pg0 sub if
382         self.vapi.sw_interface_span_enable_disable(
383             self.sub_if.sw_if_index, gre_sub_if.sw_if_index, state=0, is_l2=1
384         )
385         gre_if.remove_vpp_config()
386         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
387
388         self.verify_capture(pg1_pkts, pg2_decaped)
389
390     def test_span_l2_rx_dst_1q_vtr(self):
391         """SPAN l2 rx mirror into 1q subif+vtr"""
392
393         self.sub_if.admin_up()
394         self.vlan_sub_if.admin_up()
395
396         self.bridge(self.vlan_sub_if.sw_if_index)
397         # Create bi-directional cross-connects between pg0 and pg1
398         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=1)
399
400         # Create incoming packet streams for packet-generator interfaces
401         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
402         self.pg0.add_stream(pkts)
403
404         self.vapi.sw_interface_span_enable_disable(
405             self.sub_if.sw_if_index, self.vlan_sub_if.sw_if_index, is_l2=1
406         )
407
408         # Enable packet capturing and start packet sending
409         self.pg_enable_capture(self.pg_interfaces)
410         self.pg_start()
411
412         # Verify packets outgoing packet streams on mirrored interface (pg2)
413         n_pkts = len(pkts)
414         pg1_pkts = self.pg1.get_capture(n_pkts)
415         pg2_pkts = self.pg2.get_capture(n_pkts)
416         pg2_untagged = [
417             self.remove_tags(p, [Tag(dot1=DOT1Q, vlan=300)]) for p in pg2_pkts
418         ]
419
420         self.bridge(self.vlan_sub_if.sw_if_index, is_add=0)
421         # Disable SPAN on pg0 sub if (mirrored to vxlan)
422         self.vapi.sw_interface_span_enable_disable(
423             self.sub_if.sw_if_index, self.vlan_sub_if.sw_if_index, state=0, is_l2=1
424         )
425         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
426
427         self.verify_capture(pg1_pkts, pg2_untagged)
428
429     def test_span_l2_rx_dst_1ad_vtr(self):
430         """SPAN l2 rx mirror into 1ad subif+vtr"""
431
432         self.sub_if.admin_up()
433         self.qinq_sub_if.admin_up()
434
435         self.bridge(self.qinq_sub_if.sw_if_index)
436         # Create bi-directional cross-connects between pg0 and pg1
437         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=1)
438
439         # Create incoming packet streams for packet-generator interfaces
440         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
441         self.pg0.add_stream(pkts)
442
443         self.vapi.sw_interface_span_enable_disable(
444             self.sub_if.sw_if_index, self.qinq_sub_if.sw_if_index, is_l2=1
445         )
446
447         # Enable packet capturing and start packet sending
448         self.pg_enable_capture(self.pg_interfaces)
449         self.pg_start()
450
451         # Verify packets outgoing packet streams on mirrored interface (pg2)
452         n_pkts = len(pkts)
453         pg1_pkts = self.pg1.get_capture(n_pkts)
454         pg2_pkts = self.pg2.get_capture(n_pkts)
455         pg2_untagged = [
456             self.remove_tags(p, [Tag(dot1=DOT1AD, vlan=400), Tag(dot1=DOT1Q, vlan=500)])
457             for p in pg2_pkts
458         ]
459
460         self.bridge(self.qinq_sub_if.sw_if_index, is_add=0)
461         # Disable SPAN on pg0 sub if (mirrored to vxlan)
462         self.vapi.sw_interface_span_enable_disable(
463             self.sub_if.sw_if_index, self.qinq_sub_if.sw_if_index, state=0, is_l2=1
464         )
465         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
466
467         self.verify_capture(pg1_pkts, pg2_untagged)
468
469     def test_l2_tx_span(self):
470         """SPAN l2 tx mirror"""
471
472         self.sub_if.admin_up()
473         self.bridge(self.pg2.sw_if_index)
474         # Create bi-directional cross-connects between pg0 and pg1
475         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index)
476         # Create incoming packet streams for packet-generator interfaces
477         pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
478         self.pg0.add_stream(pkts)
479
480         # Enable SPAN on pg1 (mirrored to pg2)
481         self.vapi.sw_interface_span_enable_disable(
482             self.pg1.sw_if_index, self.pg2.sw_if_index, is_l2=1, state=2
483         )
484
485         self.logger.info(self.vapi.ppcli("show interface span"))
486         # Enable packet capturing and start packet sending
487         self.pg_enable_capture(self.pg_interfaces)
488         self.pg_start()
489
490         # Verify packets outgoing packet streams on mirrored interface (pg2)
491         n_pkts = len(pkts)
492         pg1_pkts = self.pg1.get_capture(n_pkts)
493         pg2_pkts = self.pg2.get_capture(n_pkts)
494         self.bridge(self.pg2.sw_if_index, is_add=0)
495         # Disable SPAN on pg0 (mirrored to pg2)
496         self.vapi.sw_interface_span_enable_disable(
497             self.pg1.sw_if_index, self.pg2.sw_if_index, state=0, is_l2=1
498         )
499         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
500
501         self.verify_capture(pg1_pkts, pg2_pkts)
502
503     def test_l2_rx_tx_span(self):
504         """SPAN l2 rx tx mirror"""
505
506         self.sub_if.admin_up()
507         self.bridge(self.pg2.sw_if_index)
508         # Create bi-directional cross-connects between pg0 and pg1
509         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index)
510
511         # Create incoming packet streams for packet-generator interfaces
512         pg0_pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, do_dot1=True)
513         self.pg0.add_stream(pg0_pkts)
514         pg1_pkts = self.create_stream(self.pg1, self.pg_if_packet_sizes, do_dot1=False)
515         self.pg1.add_stream(pg1_pkts)
516
517         # Enable SPAN on pg0 (mirrored to pg2)
518         self.vapi.sw_interface_span_enable_disable(
519             self.sub_if.sw_if_index, self.pg2.sw_if_index, is_l2=1, state=3
520         )
521         self.logger.info(self.vapi.ppcli("show interface span"))
522
523         # Enable packet capturing and start packet sending
524         self.pg_enable_capture(self.pg_interfaces)
525         self.pg_start()
526
527         # Verify packets outgoing packet streams on mirrored interface (pg2)
528         pg0_expected = len(pg1_pkts)
529         pg1_expected = len(pg0_pkts)
530         pg2_expected = pg0_expected + pg1_expected
531
532         pg0_pkts = self.pg0.get_capture(pg0_expected)
533         pg1_pkts = self.pg1.get_capture(pg1_expected)
534         pg2_pkts = self.pg2.get_capture(pg2_expected)
535
536         self.bridge(self.pg2.sw_if_index, is_add=0)
537         # Disable SPAN on pg0 (mirrored to pg2)
538         self.vapi.sw_interface_span_enable_disable(
539             self.sub_if.sw_if_index, self.pg2.sw_if_index, state=0, is_l2=1
540         )
541         self.xconnect(self.sub_if.sw_if_index, self.pg1.sw_if_index, is_add=0)
542
543         self.verify_capture(pg0_pkts + pg1_pkts, pg2_pkts)
544
545     def test_l2_bcast_mirror(self):
546         """SPAN l2 broadcast mirror"""
547
548         self.sub_if.admin_up()
549         self.bridge(self.pg2.sw_if_index)
550
551         # Create bi-directional cross-connects between pg0 and pg1
552         self.vapi.sw_interface_set_l2_bridge(
553             rx_sw_if_index=self.sub_if.sw_if_index, bd_id=99, enable=1
554         )
555         self.vapi.sw_interface_set_l2_bridge(
556             rx_sw_if_index=self.pg1.sw_if_index, bd_id=99, enable=1
557         )
558
559         # Create incoming packet streams for packet-generator interfaces
560         pg0_pkts = self.create_stream(
561             self.pg0, self.pg_if_packet_sizes, do_dot1=True, bcast=True
562         )
563         self.pg0.add_stream(pg0_pkts)
564         pg1_pkts = self.create_stream(
565             self.pg1, self.pg_if_packet_sizes, do_dot1=False, bcast=True
566         )
567         self.pg1.add_stream(pg1_pkts)
568
569         # Enable SPAN on pg0 (mirrored to pg2)
570         self.vapi.sw_interface_span_enable_disable(
571             self.sub_if.sw_if_index, self.pg2.sw_if_index, is_l2=1, state=3
572         )
573         self.logger.info(self.vapi.ppcli("show interface span"))
574
575         # Enable packet capturing and start packet sending
576         self.pg_enable_capture(self.pg_interfaces)
577         self.pg_start()
578
579         # Verify packets outgoing packet streams on mirrored interface (pg2)
580         pg0_expected = len(pg1_pkts)
581         pg1_expected = len(pg0_pkts)
582         pg2_expected = pg0_expected + pg1_expected
583
584         pg0_pkts = self.pg0.get_capture(pg0_expected)
585         pg1_pkts = self.pg1.get_capture(pg1_expected)
586         pg2_pkts = self.pg2.get_capture(pg2_expected)
587
588         self.bridge(self.pg2.sw_if_index, is_add=0)
589         self.vapi.sw_interface_set_l2_bridge(
590             rx_sw_if_index=self.sub_if.sw_if_index, bd_id=99, enable=0
591         )
592         self.vapi.sw_interface_set_l2_bridge(
593             rx_sw_if_index=self.pg1.sw_if_index, bd_id=99, enable=0
594         )
595         # Disable SPAN on pg0 (mirrored to pg2)
596         self.vapi.sw_interface_span_enable_disable(
597             self.sub_if.sw_if_index, self.pg2.sw_if_index, state=0, is_l2=1
598         )
599
600         self.verify_capture(pg0_pkts + pg1_pkts, pg2_pkts)
601
602
603 if __name__ == "__main__":
604     unittest.main(testRunner=VppTestRunner)