fib: fib api updates
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Static Proxy plugin Test Case """
25
26     @classmethod
27     def setUpClass(self):
28         super(TestSRv6, self).setUpClass()
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestSRv6, cls).tearDownClass()
33
34     def setUp(self):
35         """ Perform test setup before each test case.
36         """
37         super(TestSRv6, self).setUp()
38
39         # packet sizes, inclusive L2 overhead
40         self.pg_packet_sizes = [64, 512, 1518, 9018]
41
42         # reset packet_infos
43         self.reset_packet_infos()
44
45     def tearDown(self):
46         """ Clean up test setup after each test case.
47         """
48         self.teardown_interfaces()
49
50         super(TestSRv6, self).tearDown()
51
52     def configure_interface(self,
53                             interface,
54                             ipv6=False, ipv4=False,
55                             ipv6_table_id=0, ipv4_table_id=0):
56         """ Configure interface.
57         :param ipv6: configure IPv6 on interface
58         :param ipv4: configure IPv4 on interface
59         :param ipv6_table_id: FIB table_id for IPv6
60         :param ipv4_table_id: FIB table_id for IPv4
61         """
62         self.logger.debug("Configuring interface %s" % (interface.name))
63         if ipv6:
64             self.logger.debug("Configuring IPv6")
65             interface.set_table_ip6(ipv6_table_id)
66             interface.config_ip6()
67             interface.resolve_ndp(timeout=5)
68         if ipv4:
69             self.logger.debug("Configuring IPv4")
70             interface.set_table_ip4(ipv4_table_id)
71             interface.config_ip4()
72             interface.resolve_arp()
73         interface.admin_up()
74
75     def setup_interfaces(self, ipv6=[], ipv4=[],
76                          ipv6_table_id=[], ipv4_table_id=[]):
77         """ Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(intf,
113                                      ipv6[i], ipv4[i],
114                                      ipv6_table_id[i], ipv4_table_id[i])
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip arp"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """ Unconfigure and bring down interface.
127         """
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     def test_SRv6_End_AS_IPv6_noSRH(self):
139         """ Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite.
140         """
141         self.run_SRv6_End_AS_IPv6(
142             sid_list=['a1::', 'a2::a6', 'a3::'],
143             test_sid_index=1,
144             rewrite_src_addr='a2::')
145
146     def test_SRv6_End_AS_IPv6_SRH(self):
147         """ Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite.
148         """
149         self.run_SRv6_End_AS_IPv6(
150             sid_list=['a1::a6', 'a2::', 'a3::'],
151             test_sid_index=0,
152             rewrite_src_addr='a1::')
153
154     def test_SRv6_End_AS_IPv4_noSRH(self):
155         """ Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite.
156         """
157         self.run_SRv6_End_AS_IPv4(
158             sid_list=['a1::', 'a2::a6', 'a3::'],
159             test_sid_index=1,
160             rewrite_src_addr='a2::')
161
162     def test_SRv6_End_AS_IPv4_SRH(self):
163         """ Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite.
164         """
165         self.run_SRv6_End_AS_IPv4(
166             sid_list=['a1::a6', 'a2::', 'a3::'],
167             test_sid_index=0,
168             rewrite_src_addr='a1::')
169
170     def test_SRv6_End_AS_L2_noSRH(self):
171         """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
172         """
173         self.run_SRv6_End_AS_L2(
174             sid_list=['a1::', 'a2::a6', 'a3::'],
175             test_sid_index=1,
176             rewrite_src_addr='a2::')
177
178     def test_SRv6_End_AS_L2_SRH(self):
179         """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
180         """
181         self.run_SRv6_End_AS_L2(
182             sid_list=['a1::a6', 'a2::', 'a3::'],
183             test_sid_index=0,
184             rewrite_src_addr='a1::')
185
186     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
187         """ Run SRv6 End.AS test with L2 traffic.
188         """
189         self.rewrite_src_addr = rewrite_src_addr
190         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
191
192         # send traffic to one destination interface
193         # source and destination interfaces are IPv6 only
194         self.setup_interfaces(ipv6=[True, False])
195
196         # configure route to next segment
197         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
198                            [VppRoutePath(self.pg0.remote_ip6,
199                                          self.pg0.sw_if_index)])
200         route.add_vpp_config()
201
202         # configure SRv6 localSID behavior
203         cli_str = "sr localsid address " + sid_list[test_sid_index] \
204             + " behavior end.as" \
205             + " oif " + self.pg1.name \
206             + " iif " + self.pg1.name \
207             + " src " + self.rewrite_src_addr
208         for s in self.rewrite_sid_list:
209             cli_str += " next " + s
210         self.vapi.cli(cli_str)
211
212         # log the localsids
213         self.logger.debug(self.vapi.cli("show sr localsid"))
214
215         # send one packet per packet size
216         count = len(self.pg_packet_sizes)
217
218         # prepare L2 in SRv6 headers
219         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
220                         sidlist=sid_list[::-1],
221                         segleft=len(sid_list) - test_sid_index - 1,
222                         vlan=0)
223
224         # generate packets (pg0->pg1)
225         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
226                                    self.pg_packet_sizes, count)
227
228         # send packets and verify received packets
229         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
230                                   self.compare_rx_tx_packet_End_AS_L2_out)
231
232         # log the localsid counters
233         self.logger.info(self.vapi.cli("show sr localsid"))
234
235         # prepare L2 header for returning packets
236         packet_header2 = self.create_packet_header_L2()
237
238         # generate returning packets (pg1->pg0)
239         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
240                                    self.pg_packet_sizes, count)
241
242         # send packets and verify received packets
243         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
244                                   self.compare_rx_tx_packet_End_AS_L2_in)
245
246         # log the localsid counters
247         self.logger.info(self.vapi.cli("show sr localsid"))
248
249         # remove SRv6 localSIDs
250         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
251
252         # cleanup interfaces
253         self.teardown_interfaces()
254
255     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
256         """ Run SRv6 End.AS test with IPv6 traffic.
257         """
258         self.rewrite_src_addr = rewrite_src_addr
259         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
260
261         # send traffic to one destination interface
262         # source and destination interfaces are IPv6 only
263         self.setup_interfaces(ipv6=[True, True])
264
265         # configure route to next segment
266         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
267                            [VppRoutePath(self.pg0.remote_ip6,
268                                          self.pg0.sw_if_index)])
269         route.add_vpp_config()
270
271         # configure SRv6 localSID behavior
272         cli_str = "sr localsid address " + sid_list[test_sid_index] \
273             + " behavior end.as" \
274             + " nh " + self.pg1.remote_ip6 \
275             + " oif " + self.pg1.name \
276             + " iif " + self.pg1.name \
277             + " src " + self.rewrite_src_addr
278         for s in self.rewrite_sid_list:
279             cli_str += " next " + s
280         self.vapi.cli(cli_str)
281
282         # log the localsids
283         self.logger.debug(self.vapi.cli("show sr localsid"))
284
285         # send one packet per packet size
286         count = len(self.pg_packet_sizes)
287
288         # prepare IPv6 in SRv6 headers
289         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
290                         sidlist=sid_list[::-1],
291                         segleft=len(sid_list) - test_sid_index - 1)
292
293         # generate packets (pg0->pg1)
294         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
295                                    self.pg_packet_sizes, count)
296
297         # send packets and verify received packets
298         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
299                                   self.compare_rx_tx_packet_End_AS_IPv6_out)
300
301         # log the localsid counters
302         self.logger.info(self.vapi.cli("show sr localsid"))
303
304         # prepare IPv6 header for returning packets
305         packet_header2 = self.create_packet_header_IPv6()
306
307         # generate returning packets (pg1->pg0)
308         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
309                                    self.pg_packet_sizes, count)
310
311         # send packets and verify received packets
312         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
313                                   self.compare_rx_tx_packet_End_AS_IPv6_in)
314
315         # log the localsid counters
316         self.logger.info(self.vapi.cli("show sr localsid"))
317
318         # remove SRv6 localSIDs
319         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
320
321         # cleanup interfaces
322         self.teardown_interfaces()
323
324     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
325         """ Run SRv6 End.AS test with IPv4 traffic.
326         """
327         self.rewrite_src_addr = rewrite_src_addr
328         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
329
330         # send traffic to one destination interface
331         # source and destination interfaces are IPv6 only
332         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
333
334         # configure route to next segment
335         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
336                            [VppRoutePath(self.pg0.remote_ip6,
337                                          self.pg0.sw_if_index)])
338         route.add_vpp_config()
339
340         # configure SRv6 localSID behavior
341         cli_str = "sr localsid address " + sid_list[test_sid_index] \
342             + " behavior end.as" \
343             + " nh " + self.pg1.remote_ip4 \
344             + " oif " + self.pg1.name \
345             + " iif " + self.pg1.name \
346             + " src " + self.rewrite_src_addr
347         for s in self.rewrite_sid_list:
348             cli_str += " next " + s
349         self.vapi.cli(cli_str)
350
351         # log the localsids
352         self.logger.debug(self.vapi.cli("show sr localsid"))
353
354         # send one packet per packet size
355         count = len(self.pg_packet_sizes)
356
357         # prepare IPv4 in SRv6 headers
358         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
359                         sidlist=sid_list[::-1],
360                         segleft=len(sid_list) - test_sid_index - 1)
361
362         # generate packets (pg0->pg1)
363         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
364                                    self.pg_packet_sizes, count)
365
366         # send packets and verify received packets
367         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
368                                   self.compare_rx_tx_packet_End_AS_IPv4_out)
369
370         # log the localsid counters
371         self.logger.info(self.vapi.cli("show sr localsid"))
372
373         # prepare IPv6 header for returning packets
374         packet_header2 = self.create_packet_header_IPv4()
375
376         # generate returning packets (pg1->pg0)
377         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
378                                    self.pg_packet_sizes, count)
379
380         # send packets and verify received packets
381         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
382                                   self.compare_rx_tx_packet_End_AS_IPv4_in)
383
384         # log the localsid counters
385         self.logger.info(self.vapi.cli("show sr localsid"))
386
387         # remove SRv6 localSIDs
388         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
389
390         # cleanup interfaces
391         self.teardown_interfaces()
392
393     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
394         """ Compare input and output packet after passing End.AS
395
396         :param tx_pkt: transmitted packet
397         :param rx_pkt: received packet
398         """
399
400         # get first (outer) IPv6 header of rx'ed packet
401         rx_ip = rx_pkt.getlayer(IPv6)
402         rx_srh = None
403
404         tx_ip = tx_pkt.getlayer(IPv6)
405
406         # expected segment-list (SRH order)
407         tx_seglist = self.rewrite_sid_list[::-1]
408
409         # received ip.src should be equal to SR Policy source
410         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
411         # received ip.dst should be equal to expected sidlist[lastentry]
412         self.assertEqual(rx_ip.dst, tx_seglist[-1])
413
414         if len(tx_seglist) > 1:
415             # rx'ed packet should have SRH
416             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
417             # get SRH
418             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
419             # rx'ed seglist should be equal to expected seglist
420             self.assertEqual(rx_srh.addresses, tx_seglist)
421             # segleft should be equal to size expected seglist-1
422             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
423             # segleft should be equal to lastentry
424             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
425             # get payload
426             payload = rx_srh.payload
427         else:
428             # rx'ed packet should NOT have SRH
429             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
430             # get payload
431             payload = rx_ip.payload
432
433         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
434         # except for the hop-limit field
435         #   -> update tx'ed hlim to the expected hlim
436         tx_ip.hlim = tx_ip.hlim - 1
437
438         self.assertEqual(payload, tx_ip)
439
440         self.logger.debug("packet verification: SUCCESS")
441
442     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
443         """ Compare input and output packet after passing End.AS
444
445         :param tx_pkt: transmitted packet
446         :param rx_pkt: received packet
447         """
448
449         # get first (outer) IPv6 header of rx'ed packet
450         rx_ip = rx_pkt.getlayer(IPv6)
451         rx_srh = None
452
453         tx_ip = tx_pkt.getlayer(IP)
454
455         # expected segment-list (SRH order)
456         tx_seglist = self.rewrite_sid_list[::-1]
457
458         # received ip.src should be equal to SR Policy source
459         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
460         # received ip.dst should be equal to expected sidlist[lastentry]
461         self.assertEqual(rx_ip.dst, tx_seglist[-1])
462
463         if len(tx_seglist) > 1:
464             # rx'ed packet should have SRH and IPv4 header
465             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
466             self.assertTrue(rx_ip.payload.haslayer(IP))
467             # get SRH
468             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
469             # rx'ed seglist should be equal to seglist
470             self.assertEqual(rx_srh.addresses, tx_seglist)
471             # segleft should be equal to size seglist-1
472             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
473             # segleft should be equal to lastentry
474             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
475             payload = rx_srh.payload
476         else:
477             # rx'ed packet should NOT have SRH
478             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
479             # get payload
480             payload = rx_ip.payload
481
482         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
483         # except for the ttl field and ip checksum
484         #   -> adjust tx'ed ttl to expected ttl
485         tx_ip.ttl = tx_ip.ttl - 1
486         #   -> set tx'ed ip checksum to None and let scapy recompute
487         tx_ip.chksum = None
488         # read back the pkt (with str()) to force computing these fields
489         # probably other ways to accomplish this are possible
490         tx_ip = IP(scapy.compat.raw(tx_ip))
491
492         self.assertEqual(payload, tx_ip)
493
494         self.logger.debug("packet verification: SUCCESS")
495
496     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
497         """ Compare input and output packet after passing End.AS
498
499         :param tx_pkt: transmitted packet
500         :param rx_pkt: received packet
501         """
502
503         # get first (outer) IPv6 header of rx'ed packet
504         rx_ip = rx_pkt.getlayer(IPv6)
505         rx_srh = None
506
507         tx_ether = tx_pkt.getlayer(Ether)
508
509         # expected segment-list (SRH order)
510         tx_seglist = self.rewrite_sid_list[::-1]
511
512         # received ip.src should be equal to SR Policy source
513         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
514         # received ip.dst should be equal to expected sidlist[lastentry]
515         self.assertEqual(rx_ip.dst, tx_seglist[-1])
516
517         if len(tx_seglist) > 1:
518             # rx'ed packet should have SRH
519             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
520             # get SRH
521             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
522             # rx'ed seglist should be equal to seglist
523             self.assertEqual(rx_srh.addresses, tx_seglist)
524             # segleft should be equal to size seglist-1
525             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
526             # segleft should be equal to lastentry
527             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
528             # nh should be "No Next Header" (59)
529             self.assertEqual(rx_srh.nh, 59)
530             # get payload
531             payload = rx_srh.payload
532         else:
533             # rx'ed packet should NOT have SRH
534             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
535             # get payload
536             payload = rx_ip.payload
537
538         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
539         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
540
541         self.logger.debug("packet verification: SUCCESS")
542
543     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
544         """ Compare input and output packet after passing End.AS with IPv6
545
546         :param tx_pkt: transmitted packet
547         :param rx_pkt: received packet
548         """
549
550         # get first (outer) IPv6 header of rx'ed packet
551         rx_ip = rx_pkt.getlayer(IPv6)
552
553         tx_ip = tx_pkt.getlayer(IPv6)
554         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
555
556         # verify if rx'ed packet has no SRH
557         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
558
559         # the whole rx_ip pkt should be equal to tx_ip2
560         # except for the hlim field
561         #   -> adjust tx'ed hlim to expected hlim
562         tx_ip2.hlim = tx_ip2.hlim - 1
563
564         self.assertEqual(rx_ip, tx_ip2)
565
566         self.logger.debug("packet verification: SUCCESS")
567
568     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
569         """ Compare input and output packet after passing End.AS with IPv4
570
571         :param tx_pkt: transmitted packet
572         :param rx_pkt: received packet
573         """
574
575         # get IPv4 header of rx'ed packet
576         rx_ip = rx_pkt.getlayer(IP)
577
578         tx_ip = tx_pkt.getlayer(IPv6)
579         tx_ip2 = tx_pkt.getlayer(IP)
580
581         # verify if rx'ed packet has no SRH
582         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
583
584         # the whole rx_ip pkt should be equal to tx_ip2
585         # except for the ttl field and ip checksum
586         #   -> adjust tx'ed ttl to expected ttl
587         tx_ip2.ttl = tx_ip2.ttl - 1
588         #   -> set tx'ed ip checksum to None and let scapy recompute
589         tx_ip2.chksum = None
590         # read back the pkt (with str()) to force computing these fields
591         # probably other ways to accomplish this are possible
592         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
593
594         self.assertEqual(rx_ip, tx_ip2)
595
596         self.logger.debug("packet verification: SUCCESS")
597
598     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
599         """ Compare input and output packet after passing End.AS with L2
600
601         :param tx_pkt: transmitted packet
602         :param rx_pkt: received packet
603         """
604
605         # get IPv4 header of rx'ed packet
606         rx_eth = rx_pkt.getlayer(Ether)
607
608         tx_ip = tx_pkt.getlayer(IPv6)
609         # we can't just get the 2nd Ether layer
610         # get the Raw content and dissect it as Ether
611         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
612
613         # verify if rx'ed packet has no SRH
614         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
615
616         # the whole rx_eth pkt should be equal to tx_eth1
617         self.assertEqual(rx_eth, tx_eth1)
618
619         self.logger.debug("packet verification: SUCCESS")
620
621     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
622                       count):
623         """Create SRv6 input packet stream for defined interface.
624
625         :param VppInterface src_if: Interface to create packet stream for
626         :param VppInterface dst_if: destination interface of packet stream
627         :param packet_header: Layer3 scapy packet headers,
628                 L2 is added when not provided,
629                 Raw(payload) with packet_info is added
630         :param list packet_sizes: packet stream pckt sizes,sequentially applied
631                to packets in stream have
632         :param int count: number of packets in packet stream
633         :return: list of packets
634         """
635         self.logger.info("Creating packets")
636         pkts = []
637         for i in range(0, count-1):
638             payload_info = self.create_packet_info(src_if, dst_if)
639             self.logger.debug(
640                 "Creating packet with index %d" % (payload_info.index))
641             payload = self.info_to_payload(payload_info)
642             # add L2 header if not yet provided in packet_header
643             if packet_header.getlayer(0).name == 'Ethernet':
644                 p = (packet_header /
645                      Raw(payload))
646             else:
647                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
648                      packet_header /
649                      Raw(payload))
650             size = packet_sizes[i % len(packet_sizes)]
651             self.logger.debug("Packet size %d" % (size))
652             self.extend_packet(p, size)
653             # we need to store the packet with the automatic fields computed
654             # read back the dumped packet (with str())
655             # to force computing these fields
656             # probably other ways are possible
657             p = Ether(scapy.compat.raw(p))
658             payload_info.data = p.copy()
659             self.logger.debug(ppp("Created packet:", p))
660             pkts.append(p)
661         self.logger.info("Done creating packets")
662         return pkts
663
664     def send_and_verify_pkts(self, input, pkts, output, compare_func):
665         """Send packets and verify received packets using compare_func
666
667         :param input: ingress interface of DUT
668         :param pkts: list of packets to transmit
669         :param output: egress interface of DUT
670         :param compare_func: function to compare in and out packets
671         """
672         # add traffic stream to input interface
673         input.add_stream(pkts)
674
675         # enable capture on all interfaces
676         self.pg_enable_capture(self.pg_interfaces)
677
678         # start traffic
679         self.logger.info("Starting traffic")
680         self.pg_start()
681
682         # get output capture
683         self.logger.info("Getting packet capture")
684         capture = output.get_capture()
685
686         # assert nothing was captured on input interface
687         # input.assert_nothing_captured()
688
689         # verify captured packets
690         self.verify_captured_pkts(output, capture, compare_func)
691
692     def create_packet_header_IPv6(self):
693         """Create packet header: IPv6 header, UDP header
694
695         :param dst: IPv6 destination address
696
697         IPv6 source address is 1234::1
698         IPv6 destination address is 4321::1
699         UDP source port and destination port are 1234
700         """
701
702         p = (IPv6(src='1234::1', dst='4321::1') /
703              UDP(sport=1234, dport=1234))
704         return p
705
706     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
707         """Create packet header: IPv6 encapsulated in SRv6:
708         IPv6 header with SRH, IPv6 header, UDP header
709
710         :param list sidlist: segment list of outer IPv6 SRH
711         :param int segleft: segments-left field of outer IPv6 SRH
712
713         Outer IPv6 source address is set to 5678::1
714         Outer IPv6 destination address is set to sidlist[segleft]
715         IPv6 source addresses is 1234::1
716         IPv6 destination address is 4321::1
717         UDP source port and destination port are 1234
718         """
719
720         p = (IPv6(src='5678::1', dst=sidlist[segleft]) /
721              IPv6ExtHdrSegmentRouting(addresses=sidlist,
722                                       segleft=segleft, nh=41) /
723              IPv6(src='1234::1', dst='4321::1') /
724              UDP(sport=1234, dport=1234))
725         return p
726
727     def create_packet_header_IPv4(self):
728         """Create packet header: IPv4 header, UDP header
729
730         :param dst: IPv4 destination address
731
732         IPv4 source address is 123.1.1.1
733         IPv4 destination address is 124.1.1.1
734         UDP source port and destination port are 1234
735         """
736
737         p = (IP(src='123.1.1.1', dst='124.1.1.1') /
738              UDP(sport=1234, dport=1234))
739         return p
740
741     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
742         """Create packet header: IPv4 encapsulated in SRv6:
743         IPv6 header with SRH, IPv4 header, UDP header
744
745         :param ipv4address dst: inner IPv4 destination address
746         :param list sidlist: segment list of outer IPv6 SRH
747         :param int segleft: segments-left field of outer IPv6 SRH
748
749         Outer IPv6 destination address is set to sidlist[segleft]
750         IPv6 source address is 1234::1
751         IPv4 source address is 123.1.1.1
752         IPv4 destination address is 124.1.1.1
753         UDP source port and destination port are 1234
754         """
755
756         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
757              IPv6ExtHdrSegmentRouting(addresses=sidlist,
758                                       segleft=segleft, nh=4) /
759              IP(src='123.1.1.1', dst='124.1.1.1') /
760              UDP(sport=1234, dport=1234))
761         return p
762
763     def create_packet_header_L2(self, vlan=0):
764         """Create packet header: L2 header
765
766         :param vlan: if vlan!=0 then add 802.1q header
767         """
768         # Note: the dst addr ('00:55:44:33:22:11') is used in
769         # the compare function compare_rx_tx_packet_T_Encaps_L2
770         # to detect presence of L2 in SRH payload
771         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
772         etype = 0x8137  # IPX
773         if vlan:
774             # add 802.1q layer
775             p /= Dot1Q(vlan=vlan, type=etype)
776         else:
777             p.type = etype
778         return p
779
780     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
781         """Create packet header: L2 encapsulated in SRv6:
782         IPv6 header with SRH, L2
783
784         :param list sidlist: segment list of outer IPv6 SRH
785         :param int segleft: segments-left field of outer IPv6 SRH
786         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
787
788         Outer IPv6 destination address is set to sidlist[segleft]
789         IPv6 source address is 1234::1
790         """
791         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
792         etype = 0x8137  # IPX
793         if vlan:
794             # add 802.1q layer
795             eth /= Dot1Q(vlan=vlan, type=etype)
796         else:
797             eth.type = etype
798
799         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
800              IPv6ExtHdrSegmentRouting(addresses=sidlist,
801                                       segleft=segleft, nh=59) /
802              eth)
803         return p
804
805     def get_payload_info(self, packet):
806         """ Extract the payload_info from the packet
807         """
808         # in most cases, payload_info is in packet[Raw]
809         # but packet[Raw] gives the complete payload
810         # (incl L2 header) for the T.Encaps L2 case
811         try:
812             payload_info = self.payload_to_info(packet[Raw])
813
814         except:
815             # remote L2 header from packet[Raw]:
816             # take packet[Raw], convert it to an Ether layer
817             # and then extract Raw from it
818             payload_info = self.payload_to_info(
819                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
820
821         return payload_info
822
823     def verify_captured_pkts(self, dst_if, capture, compare_func):
824         """
825         Verify captured packet stream for specified interface.
826         Compare ingress with egress packets using the specified compare fn
827
828         :param dst_if: egress interface of DUT
829         :param capture: captured packets
830         :param compare_func: function to compare in and out packet
831         """
832         self.logger.info("Verifying capture on interface %s using function %s"
833                          % (dst_if.name, compare_func.__name__))
834
835         last_info = dict()
836         for i in self.pg_interfaces:
837             last_info[i.sw_if_index] = None
838         dst_sw_if_index = dst_if.sw_if_index
839
840         for packet in capture:
841             try:
842                 # extract payload_info from packet's payload
843                 payload_info = self.get_payload_info(packet)
844                 packet_index = payload_info.index
845
846                 self.logger.debug("Verifying packet with index %d"
847                                   % (packet_index))
848                 # packet should have arrived on the expected interface
849                 self.assertEqual(payload_info.dst, dst_sw_if_index)
850                 self.logger.debug(
851                     "Got packet on interface %s: src=%u (idx=%u)" %
852                     (dst_if.name, payload_info.src, packet_index))
853
854                 # search for payload_info with same src and dst if_index
855                 # this will give us the transmitted packet
856                 next_info = self.get_next_packet_info_for_interface2(
857                     payload_info.src, dst_sw_if_index,
858                     last_info[payload_info.src])
859                 last_info[payload_info.src] = next_info
860                 # next_info should not be None
861                 self.assertTrue(next_info is not None)
862                 # index of tx and rx packets should be equal
863                 self.assertEqual(packet_index, next_info.index)
864                 # data field of next_info contains the tx packet
865                 txed_packet = next_info.data
866
867                 self.logger.debug(ppp("Transmitted packet:",
868                                       txed_packet))  # ppp=Pretty Print Packet
869
870                 self.logger.debug(ppp("Received packet:", packet))
871
872                 # compare rcvd packet with expected packet using compare_func
873                 compare_func(txed_packet, packet)
874
875             except:
876                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
877                 raise
878
879         # have all expected packets arrived?
880         for i in self.pg_interfaces:
881             remaining_packet = self.get_next_packet_info_for_interface2(
882                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
883             self.assertTrue(remaining_packet is None,
884                             "Interface %s: Packet expected from interface %s "
885                             "didn't arrive" % (dst_if.name, i.name))
886
887
888 if __name__ == '__main__':
889     unittest.main(testRunner=VppTestRunner)