tests: Use errno value rather than a specific int
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath
8
9 import scapy.compat
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, Dot1Q
12 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
13 from scapy.layers.inet import IP, UDP
14
15 from util import ppp
16
17
18 class TestSRv6As(VppTestCase):
19     """SRv6 Static Proxy plugin Test Case"""
20
21     @classmethod
22     def setUpClass(self):
23         super(TestSRv6As, self).setUpClass()
24
25     @classmethod
26     def tearDownClass(cls):
27         super(TestSRv6As, cls).tearDownClass()
28
29     def setUp(self):
30         """Perform test setup before each test case."""
31         super(TestSRv6As, self).setUp()
32
33         # packet sizes, inclusive L2 overhead
34         self.pg_packet_sizes = [64, 512, 1518, 9018]
35
36         # reset packet_infos
37         self.reset_packet_infos()
38
39     def tearDown(self):
40         """Clean up test setup after each test case."""
41         self.teardown_interfaces()
42
43         super(TestSRv6As, self).tearDown()
44
45     def configure_interface(
46         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
47     ):
48         """Configure interface.
49         :param ipv6: configure IPv6 on interface
50         :param ipv4: configure IPv4 on interface
51         :param ipv6_table_id: FIB table_id for IPv6
52         :param ipv4_table_id: FIB table_id for IPv4
53         """
54         self.logger.debug("Configuring interface %s" % (interface.name))
55         if ipv6:
56             self.logger.debug("Configuring IPv6")
57             interface.set_table_ip6(ipv6_table_id)
58             interface.config_ip6()
59             interface.resolve_ndp(timeout=5)
60         if ipv4:
61             self.logger.debug("Configuring IPv4")
62             interface.set_table_ip4(ipv4_table_id)
63             interface.config_ip4()
64             interface.resolve_arp()
65         interface.admin_up()
66
67     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
68         """Create and configure interfaces.
69
70         :param ipv6: list of interface IPv6 capabilities
71         :param ipv4: list of interface IPv4 capabilities
72         :param ipv6_table_id: list of intf IPv6 FIB table_ids
73         :param ipv4_table_id: list of intf IPv4 FIB table_ids
74         :returns: List of created interfaces.
75         """
76         # how many interfaces?
77         if len(ipv6):
78             count = len(ipv6)
79         else:
80             count = len(ipv4)
81         self.logger.debug("Creating and configuring %d interfaces" % (count))
82
83         # fill up ipv6 and ipv4 lists if needed
84         # not enabled (False) is the default
85         if len(ipv6) < count:
86             ipv6 += (count - len(ipv6)) * [False]
87         if len(ipv4) < count:
88             ipv4 += (count - len(ipv4)) * [False]
89
90         # fill up table_id lists if needed
91         # table_id 0 (global) is the default
92         if len(ipv6_table_id) < count:
93             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
94         if len(ipv4_table_id) < count:
95             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
96
97         # create 'count' pg interfaces
98         self.create_pg_interfaces(range(count))
99
100         # setup all interfaces
101         for i in range(count):
102             intf = self.pg_interfaces[i]
103             self.configure_interface(
104                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
105             )
106
107         if any(ipv6):
108             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
109         if any(ipv4):
110             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
111         self.logger.debug(self.vapi.cli("show interface"))
112         self.logger.debug(self.vapi.cli("show hardware"))
113
114         return self.pg_interfaces
115
116     def teardown_interfaces(self):
117         """Unconfigure and bring down interface."""
118         self.logger.debug("Tearing down interfaces")
119         # tear down all interfaces
120         # AFAIK they cannot be deleted
121         for i in self.pg_interfaces:
122             self.logger.debug("Tear down interface %s" % (i.name))
123             i.admin_down()
124             i.unconfig()
125             i.set_table_ip4(0)
126             i.set_table_ip6(0)
127
128     def test_SRv6_End_AS_IPv6_noSRH(self):
129         """Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite."""
130         self.run_SRv6_End_AS_IPv6(
131             sid_list=["a1::", "a2::a6", "a3::"],
132             test_sid_index=1,
133             rewrite_src_addr="a2::",
134         )
135
136     def test_SRv6_End_AS_IPv6_SRH(self):
137         """Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite."""
138         self.run_SRv6_End_AS_IPv6(
139             sid_list=["a1::a6", "a2::", "a3::"],
140             test_sid_index=0,
141             rewrite_src_addr="a1::",
142         )
143
144     def test_SRv6_End_AS_IPv4_noSRH(self):
145         """Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite."""
146         self.run_SRv6_End_AS_IPv4(
147             sid_list=["a1::", "a2::a6", "a3::"],
148             test_sid_index=1,
149             rewrite_src_addr="a2::",
150         )
151
152     def test_SRv6_End_AS_IPv4_SRH(self):
153         """Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite."""
154         self.run_SRv6_End_AS_IPv4(
155             sid_list=["a1::a6", "a2::", "a3::"],
156             test_sid_index=0,
157             rewrite_src_addr="a1::",
158         )
159
160     def test_SRv6_End_AS_L2_noSRH(self):
161         """Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite."""
162         self.run_SRv6_End_AS_L2(
163             sid_list=["a1::", "a2::a6", "a3::"],
164             test_sid_index=1,
165             rewrite_src_addr="a2::",
166         )
167
168     def test_SRv6_End_AS_L2_SRH(self):
169         """Test SRv6 End.AS behavior with L2 traffic and SRH rewrite."""
170         self.run_SRv6_End_AS_L2(
171             sid_list=["a1::a6", "a2::", "a3::"],
172             test_sid_index=0,
173             rewrite_src_addr="a1::",
174         )
175
176     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
177         """Run SRv6 End.AS test with L2 traffic."""
178         self.rewrite_src_addr = rewrite_src_addr
179         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
180
181         # send traffic to one destination interface
182         # source and destination interfaces are IPv6 only
183         self.setup_interfaces(ipv6=[True, False])
184
185         # configure route to next segment
186         route = VppIpRoute(
187             self,
188             sid_list[test_sid_index + 1],
189             128,
190             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
191         )
192         route.add_vpp_config()
193
194         # configure SRv6 localSID behavior
195         cli_str = (
196             "sr localsid address "
197             + sid_list[test_sid_index]
198             + " behavior end.as"
199             + " oif "
200             + self.pg1.name
201             + " iif "
202             + self.pg1.name
203             + " src "
204             + self.rewrite_src_addr
205         )
206         for s in self.rewrite_sid_list:
207             cli_str += " next " + s
208         self.vapi.cli(cli_str)
209
210         # log the localsids
211         self.logger.debug(self.vapi.cli("show sr localsid"))
212
213         # send one packet per packet size
214         count = len(self.pg_packet_sizes)
215
216         # prepare L2 in SRv6 headers
217         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
218             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1, vlan=0
219         )
220
221         # generate packets (pg0->pg1)
222         pkts1 = self.create_stream(
223             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
224         )
225
226         # send packets and verify received packets
227         self.send_and_verify_pkts(
228             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_L2_out
229         )
230
231         # log the localsid counters
232         self.logger.info(self.vapi.cli("show sr localsid"))
233
234         # prepare L2 header for returning packets
235         packet_header2 = self.create_packet_header_L2()
236
237         # generate returning packets (pg1->pg0)
238         pkts2 = self.create_stream(
239             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
240         )
241
242         # send packets and verify received packets
243         self.send_and_verify_pkts(
244             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_L2_in
245         )
246
247         # log the localsid counters
248         self.logger.info(self.vapi.cli("show sr localsid"))
249
250         # remove SRv6 localSIDs
251         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
252
253         # cleanup interfaces
254         self.teardown_interfaces()
255
256     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
257         """Run SRv6 End.AS test with IPv6 traffic."""
258         self.rewrite_src_addr = rewrite_src_addr
259         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
260
261         # send traffic to one destination interface
262         # source and destination interfaces are IPv6 only
263         self.setup_interfaces(ipv6=[True, True])
264
265         # configure route to next segment
266         route = VppIpRoute(
267             self,
268             sid_list[test_sid_index + 1],
269             128,
270             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
271         )
272         route.add_vpp_config()
273
274         # configure SRv6 localSID behavior
275         cli_str = (
276             "sr localsid address "
277             + sid_list[test_sid_index]
278             + " behavior end.as"
279             + " nh "
280             + self.pg1.remote_ip6
281             + " oif "
282             + self.pg1.name
283             + " iif "
284             + self.pg1.name
285             + " src "
286             + self.rewrite_src_addr
287         )
288         for s in self.rewrite_sid_list:
289             cli_str += " next " + s
290         self.vapi.cli(cli_str)
291
292         # log the localsids
293         self.logger.debug(self.vapi.cli("show sr localsid"))
294
295         # send one packet per packet size
296         count = len(self.pg_packet_sizes)
297
298         # prepare IPv6 in SRv6 headers
299         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
300             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
301         )
302
303         # generate packets (pg0->pg1)
304         pkts1 = self.create_stream(
305             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
306         )
307
308         # send packets and verify received packets
309         self.send_and_verify_pkts(
310             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv6_out
311         )
312
313         # log the localsid counters
314         self.logger.info(self.vapi.cli("show sr localsid"))
315
316         # prepare IPv6 header for returning packets
317         packet_header2 = self.create_packet_header_IPv6()
318
319         # generate returning packets (pg1->pg0)
320         pkts2 = self.create_stream(
321             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
322         )
323
324         # send packets and verify received packets
325         self.send_and_verify_pkts(
326             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv6_in
327         )
328
329         # log the localsid counters
330         self.logger.info(self.vapi.cli("show sr localsid"))
331
332         # remove SRv6 localSIDs
333         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
334
335         # cleanup interfaces
336         self.teardown_interfaces()
337
338     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
339         """Run SRv6 End.AS test with IPv4 traffic."""
340         self.rewrite_src_addr = rewrite_src_addr
341         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
342
343         # send traffic to one destination interface
344         # source and destination interfaces are IPv6 only
345         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
346
347         # configure route to next segment
348         route = VppIpRoute(
349             self,
350             sid_list[test_sid_index + 1],
351             128,
352             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
353         )
354         route.add_vpp_config()
355
356         # configure SRv6 localSID behavior
357         cli_str = (
358             "sr localsid address "
359             + sid_list[test_sid_index]
360             + " behavior end.as"
361             + " nh "
362             + self.pg1.remote_ip4
363             + " oif "
364             + self.pg1.name
365             + " iif "
366             + self.pg1.name
367             + " src "
368             + self.rewrite_src_addr
369         )
370         for s in self.rewrite_sid_list:
371             cli_str += " next " + s
372         self.vapi.cli(cli_str)
373
374         # log the localsids
375         self.logger.debug(self.vapi.cli("show sr localsid"))
376
377         # send one packet per packet size
378         count = len(self.pg_packet_sizes)
379
380         # prepare IPv4 in SRv6 headers
381         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
382             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
383         )
384
385         # generate packets (pg0->pg1)
386         pkts1 = self.create_stream(
387             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
388         )
389
390         # send packets and verify received packets
391         self.send_and_verify_pkts(
392             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv4_out
393         )
394
395         # log the localsid counters
396         self.logger.info(self.vapi.cli("show sr localsid"))
397
398         # prepare IPv6 header for returning packets
399         packet_header2 = self.create_packet_header_IPv4()
400
401         # generate returning packets (pg1->pg0)
402         pkts2 = self.create_stream(
403             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
404         )
405
406         # send packets and verify received packets
407         self.send_and_verify_pkts(
408             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv4_in
409         )
410
411         # log the localsid counters
412         self.logger.info(self.vapi.cli("show sr localsid"))
413
414         # remove SRv6 localSIDs
415         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
416
417         # cleanup interfaces
418         self.teardown_interfaces()
419
420     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
421         """Compare input and output packet after passing End.AS
422
423         :param tx_pkt: transmitted packet
424         :param rx_pkt: received packet
425         """
426
427         # get first (outer) IPv6 header of rx'ed packet
428         rx_ip = rx_pkt.getlayer(IPv6)
429         rx_srh = None
430
431         tx_ip = tx_pkt.getlayer(IPv6)
432
433         # expected segment-list (SRH order)
434         tx_seglist = self.rewrite_sid_list[::-1]
435
436         # received ip.src should be equal to SR Policy source
437         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
438         # received ip.dst should be equal to expected sidlist[lastentry]
439         self.assertEqual(rx_ip.dst, tx_seglist[-1])
440
441         if len(tx_seglist) > 1:
442             # rx'ed packet should have SRH
443             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
444             # get SRH
445             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
446             # rx'ed seglist should be equal to expected seglist
447             self.assertEqual(rx_srh.addresses, tx_seglist)
448             # segleft should be equal to size expected seglist-1
449             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
450             # segleft should be equal to lastentry
451             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
452             # get payload
453             payload = rx_srh.payload
454         else:
455             # rx'ed packet should NOT have SRH
456             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
457             # get payload
458             payload = rx_ip.payload
459
460         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
461         # except for the hop-limit field
462         #   -> update tx'ed hlim to the expected hlim
463         tx_ip.hlim = tx_ip.hlim - 1
464
465         self.assertEqual(payload, tx_ip)
466
467         self.logger.debug("packet verification: SUCCESS")
468
469     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
470         """Compare input and output packet after passing End.AS
471
472         :param tx_pkt: transmitted packet
473         :param rx_pkt: received packet
474         """
475
476         # get first (outer) IPv6 header of rx'ed packet
477         rx_ip = rx_pkt.getlayer(IPv6)
478         rx_srh = None
479
480         tx_ip = tx_pkt.getlayer(IP)
481
482         # expected segment-list (SRH order)
483         tx_seglist = self.rewrite_sid_list[::-1]
484
485         # received ip.src should be equal to SR Policy source
486         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
487         # received ip.dst should be equal to expected sidlist[lastentry]
488         self.assertEqual(rx_ip.dst, tx_seglist[-1])
489
490         if len(tx_seglist) > 1:
491             # rx'ed packet should have SRH and IPv4 header
492             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
493             self.assertTrue(rx_ip.payload.haslayer(IP))
494             # get SRH
495             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
496             # rx'ed seglist should be equal to seglist
497             self.assertEqual(rx_srh.addresses, tx_seglist)
498             # segleft should be equal to size seglist-1
499             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
500             # segleft should be equal to lastentry
501             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
502             payload = rx_srh.payload
503         else:
504             # rx'ed packet should NOT have SRH
505             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
506             # get payload
507             payload = rx_ip.payload
508
509         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
510         # except for the ttl field and ip checksum
511         #   -> adjust tx'ed ttl to expected ttl
512         tx_ip.ttl = tx_ip.ttl - 1
513         #   -> set tx'ed ip checksum to None and let scapy recompute
514         tx_ip.chksum = None
515         # read back the pkt (with str()) to force computing these fields
516         # probably other ways to accomplish this are possible
517         tx_ip = IP(scapy.compat.raw(tx_ip))
518
519         self.assertEqual(payload, tx_ip)
520
521         self.logger.debug("packet verification: SUCCESS")
522
523     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
524         """Compare input and output packet after passing End.AS
525
526         :param tx_pkt: transmitted packet
527         :param rx_pkt: received packet
528         """
529
530         # get first (outer) IPv6 header of rx'ed packet
531         rx_ip = rx_pkt.getlayer(IPv6)
532         rx_srh = None
533
534         tx_ether = tx_pkt.getlayer(Ether)
535
536         # expected segment-list (SRH order)
537         tx_seglist = self.rewrite_sid_list[::-1]
538
539         # received ip.src should be equal to SR Policy source
540         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
541         # received ip.dst should be equal to expected sidlist[lastentry]
542         self.assertEqual(rx_ip.dst, tx_seglist[-1])
543
544         if len(tx_seglist) > 1:
545             # rx'ed packet should have SRH
546             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
547             # get SRH
548             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
549             # rx'ed seglist should be equal to seglist
550             self.assertEqual(rx_srh.addresses, tx_seglist)
551             # segleft should be equal to size seglist-1
552             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
553             # segleft should be equal to lastentry
554             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
555             # nh should be "No Next Header" (143)
556             self.assertEqual(rx_srh.nh, 143)
557             # get payload
558             payload = rx_srh.payload
559         else:
560             # rx'ed packet should NOT have SRH
561             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
562             # get payload
563             payload = rx_ip.payload
564
565         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
566         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
567
568         self.logger.debug("packet verification: SUCCESS")
569
570     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
571         """Compare input and output packet after passing End.AS with IPv6
572
573         :param tx_pkt: transmitted packet
574         :param rx_pkt: received packet
575         """
576
577         # get first (outer) IPv6 header of rx'ed packet
578         rx_ip = rx_pkt.getlayer(IPv6)
579
580         tx_ip = tx_pkt.getlayer(IPv6)
581         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
582
583         # verify if rx'ed packet has no SRH
584         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
585
586         # the whole rx_ip pkt should be equal to tx_ip2
587         # except for the hlim field
588         #   -> adjust tx'ed hlim to expected hlim
589         tx_ip2.hlim = tx_ip2.hlim - 1
590
591         self.assertEqual(rx_ip, tx_ip2)
592
593         self.logger.debug("packet verification: SUCCESS")
594
595     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
596         """Compare input and output packet after passing End.AS with IPv4
597
598         :param tx_pkt: transmitted packet
599         :param rx_pkt: received packet
600         """
601
602         # get IPv4 header of rx'ed packet
603         rx_ip = rx_pkt.getlayer(IP)
604
605         tx_ip = tx_pkt.getlayer(IPv6)
606         tx_ip2 = tx_pkt.getlayer(IP)
607
608         # verify if rx'ed packet has no SRH
609         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
610
611         # the whole rx_ip pkt should be equal to tx_ip2
612         # except for the ttl field and ip checksum
613         #   -> adjust tx'ed ttl to expected ttl
614         tx_ip2.ttl = tx_ip2.ttl - 1
615         #   -> set tx'ed ip checksum to None and let scapy recompute
616         tx_ip2.chksum = None
617         # read back the pkt (with str()) to force computing these fields
618         # probably other ways to accomplish this are possible
619         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
620
621         self.assertEqual(rx_ip, tx_ip2)
622
623         self.logger.debug("packet verification: SUCCESS")
624
625     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
626         """Compare input and output packet after passing End.AS with L2
627
628         :param tx_pkt: transmitted packet
629         :param rx_pkt: received packet
630         """
631
632         # get IPv4 header of rx'ed packet
633         rx_eth = rx_pkt.getlayer(Ether)
634
635         tx_ip = tx_pkt.getlayer(IPv6)
636         # we can't just get the 2nd Ether layer
637         # get the Raw content and dissect it as Ether
638         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
639
640         # verify if rx'ed packet has no SRH
641         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
642
643         # the whole rx_eth pkt should be equal to tx_eth1
644         self.assertEqual(rx_eth, tx_eth1)
645
646         self.logger.debug("packet verification: SUCCESS")
647
648     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
649         """Create SRv6 input packet stream for defined interface.
650
651         :param VppInterface src_if: Interface to create packet stream for
652         :param VppInterface dst_if: destination interface of packet stream
653         :param packet_header: Layer3 scapy packet headers,
654                 L2 is added when not provided,
655                 Raw(payload) with packet_info is added
656         :param list packet_sizes: packet stream pckt sizes,sequentially applied
657                to packets in stream have
658         :param int count: number of packets in packet stream
659         :return: list of packets
660         """
661         self.logger.info("Creating packets")
662         pkts = []
663         for i in range(0, count - 1):
664             payload_info = self.create_packet_info(src_if, dst_if)
665             self.logger.debug("Creating packet with index %d" % (payload_info.index))
666             payload = self.info_to_payload(payload_info)
667             # add L2 header if not yet provided in packet_header
668             if packet_header.getlayer(0).name == "Ethernet":
669                 p = packet_header / Raw(payload)
670             else:
671                 p = (
672                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
673                     / packet_header
674                     / Raw(payload)
675                 )
676             size = packet_sizes[i % len(packet_sizes)]
677             self.logger.debug("Packet size %d" % (size))
678             self.extend_packet(p, size)
679             # we need to store the packet with the automatic fields computed
680             # read back the dumped packet (with str())
681             # to force computing these fields
682             # probably other ways are possible
683             p = Ether(scapy.compat.raw(p))
684             payload_info.data = p.copy()
685             self.logger.debug(ppp("Created packet:", p))
686             pkts.append(p)
687         self.logger.info("Done creating packets")
688         return pkts
689
690     def send_and_verify_pkts(self, input, pkts, output, compare_func):
691         """Send packets and verify received packets using compare_func
692
693         :param input: ingress interface of DUT
694         :param pkts: list of packets to transmit
695         :param output: egress interface of DUT
696         :param compare_func: function to compare in and out packets
697         """
698         # add traffic stream to input interface
699         input.add_stream(pkts)
700
701         # enable capture on all interfaces
702         self.pg_enable_capture(self.pg_interfaces)
703
704         # start traffic
705         self.logger.info("Starting traffic")
706         self.pg_start()
707
708         # get output capture
709         self.logger.info("Getting packet capture")
710         capture = output.get_capture()
711
712         # assert nothing was captured on input interface
713         # input.assert_nothing_captured()
714
715         # verify captured packets
716         self.verify_captured_pkts(output, capture, compare_func)
717
718     def create_packet_header_IPv6(self):
719         """Create packet header: IPv6 header, UDP header
720
721         :param dst: IPv6 destination address
722
723         IPv6 source address is 1234::1
724         IPv6 destination address is 4321::1
725         UDP source port and destination port are 1234
726         """
727
728         p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
729         return p
730
731     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
732         """Create packet header: IPv6 encapsulated in SRv6:
733         IPv6 header with SRH, IPv6 header, UDP header
734
735         :param list sidlist: segment list of outer IPv6 SRH
736         :param int segleft: segments-left field of outer IPv6 SRH
737
738         Outer IPv6 source address is set to 5678::1
739         Outer IPv6 destination address is set to sidlist[segleft]
740         IPv6 source addresses is 1234::1
741         IPv6 destination address is 4321::1
742         UDP source port and destination port are 1234
743         """
744
745         p = (
746             IPv6(src="5678::1", dst=sidlist[segleft])
747             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
748             / IPv6(src="1234::1", dst="4321::1")
749             / UDP(sport=1234, dport=1234)
750         )
751         return p
752
753     def create_packet_header_IPv4(self):
754         """Create packet header: IPv4 header, UDP header
755
756         :param dst: IPv4 destination address
757
758         IPv4 source address is 123.1.1.1
759         IPv4 destination address is 124.1.1.1
760         UDP source port and destination port are 1234
761         """
762
763         p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
764         return p
765
766     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
767         """Create packet header: IPv4 encapsulated in SRv6:
768         IPv6 header with SRH, IPv4 header, UDP header
769
770         :param ipv4address dst: inner IPv4 destination address
771         :param list sidlist: segment list of outer IPv6 SRH
772         :param int segleft: segments-left field of outer IPv6 SRH
773
774         Outer IPv6 destination address is set to sidlist[segleft]
775         IPv6 source address is 1234::1
776         IPv4 source address is 123.1.1.1
777         IPv4 destination address is 124.1.1.1
778         UDP source port and destination port are 1234
779         """
780
781         p = (
782             IPv6(src="1234::1", dst=sidlist[segleft])
783             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
784             / IP(src="123.1.1.1", dst="124.1.1.1")
785             / UDP(sport=1234, dport=1234)
786         )
787         return p
788
789     def create_packet_header_L2(self, vlan=0):
790         """Create packet header: L2 header
791
792         :param vlan: if vlan!=0 then add 802.1q header
793         """
794         # Note: the dst addr ('00:55:44:33:22:11') is used in
795         # the compare function compare_rx_tx_packet_T_Encaps_L2
796         # to detect presence of L2 in SRH payload
797         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
798         etype = 0x8137  # IPX
799         if vlan:
800             # add 802.1q layer
801             p /= Dot1Q(vlan=vlan, type=etype)
802         else:
803             p.type = etype
804         return p
805
806     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
807         """Create packet header: L2 encapsulated in SRv6:
808         IPv6 header with SRH, L2
809
810         :param list sidlist: segment list of outer IPv6 SRH
811         :param int segleft: segments-left field of outer IPv6 SRH
812         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
813
814         Outer IPv6 destination address is set to sidlist[segleft]
815         IPv6 source address is 1234::1
816         """
817         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
818         etype = 0x8137  # IPX
819         if vlan:
820             # add 802.1q layer
821             eth /= Dot1Q(vlan=vlan, type=etype)
822         else:
823             eth.type = etype
824
825         p = (
826             IPv6(src="1234::1", dst=sidlist[segleft])
827             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
828             / eth
829         )
830         return p
831
832     def get_payload_info(self, packet):
833         """Extract the payload_info from the packet"""
834         # in most cases, payload_info is in packet[Raw]
835         # but packet[Raw] gives the complete payload
836         # (incl L2 header) for the T.Encaps L2 case
837         try:
838             payload_info = self.payload_to_info(packet[Raw])
839
840         except:
841             # remote L2 header from packet[Raw]:
842             # take packet[Raw], convert it to an Ether layer
843             # and then extract Raw from it
844             payload_info = self.payload_to_info(
845                 Ether(scapy.compat.raw(packet[Raw]))[Raw]
846             )
847
848         return payload_info
849
850     def verify_captured_pkts(self, dst_if, capture, compare_func):
851         """
852         Verify captured packet stream for specified interface.
853         Compare ingress with egress packets using the specified compare fn
854
855         :param dst_if: egress interface of DUT
856         :param capture: captured packets
857         :param compare_func: function to compare in and out packet
858         """
859         self.logger.info(
860             "Verifying capture on interface %s using function %s"
861             % (dst_if.name, compare_func.__name__)
862         )
863
864         last_info = dict()
865         for i in self.pg_interfaces:
866             last_info[i.sw_if_index] = None
867         dst_sw_if_index = dst_if.sw_if_index
868
869         for packet in capture:
870             try:
871                 # extract payload_info from packet's payload
872                 payload_info = self.get_payload_info(packet)
873                 packet_index = payload_info.index
874
875                 self.logger.debug("Verifying packet with index %d" % (packet_index))
876                 # packet should have arrived on the expected interface
877                 self.assertEqual(payload_info.dst, dst_sw_if_index)
878                 self.logger.debug(
879                     "Got packet on interface %s: src=%u (idx=%u)"
880                     % (dst_if.name, payload_info.src, packet_index)
881                 )
882
883                 # search for payload_info with same src and dst if_index
884                 # this will give us the transmitted packet
885                 next_info = self.get_next_packet_info_for_interface2(
886                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
887                 )
888                 last_info[payload_info.src] = next_info
889                 # next_info should not be None
890                 self.assertTrue(next_info is not None)
891                 # index of tx and rx packets should be equal
892                 self.assertEqual(packet_index, next_info.index)
893                 # data field of next_info contains the tx packet
894                 txed_packet = next_info.data
895
896                 self.logger.debug(
897                     ppp("Transmitted packet:", txed_packet)
898                 )  # ppp=Pretty Print Packet
899
900                 self.logger.debug(ppp("Received packet:", packet))
901
902                 # compare rcvd packet with expected packet using compare_func
903                 compare_func(txed_packet, packet)
904
905             except:
906                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
907                 raise
908
909         # have all expected packets arrived?
910         for i in self.pg_interfaces:
911             remaining_packet = self.get_next_packet_info_for_interface2(
912                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
913             )
914             self.assertTrue(
915                 remaining_packet is None,
916                 "Interface %s: Packet expected from interface %s "
917                 "didn't arrive" % (dst_if.name, i.name),
918             )
919
920
921 if __name__ == "__main__":
922     unittest.main(testRunner=VppTestRunner)