Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Static Proxy plugin Test Case """
25
26     @classmethod
27     def setUpClass(self):
28         super(TestSRv6, self).setUpClass()
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestSRv6, cls).tearDownClass()
33
34     def setUp(self):
35         """ Perform test setup before each test case.
36         """
37         super(TestSRv6, self).setUp()
38
39         # packet sizes, inclusive L2 overhead
40         self.pg_packet_sizes = [64, 512, 1518, 9018]
41
42         # reset packet_infos
43         self.reset_packet_infos()
44
45     def tearDown(self):
46         """ Clean up test setup after each test case.
47         """
48         self.teardown_interfaces()
49
50         super(TestSRv6, self).tearDown()
51
52     def configure_interface(self,
53                             interface,
54                             ipv6=False, ipv4=False,
55                             ipv6_table_id=0, ipv4_table_id=0):
56         """ Configure interface.
57         :param ipv6: configure IPv6 on interface
58         :param ipv4: configure IPv4 on interface
59         :param ipv6_table_id: FIB table_id for IPv6
60         :param ipv4_table_id: FIB table_id for IPv4
61         """
62         self.logger.debug("Configuring interface %s" % (interface.name))
63         if ipv6:
64             self.logger.debug("Configuring IPv6")
65             interface.set_table_ip6(ipv6_table_id)
66             interface.config_ip6()
67             interface.resolve_ndp(timeout=5)
68         if ipv4:
69             self.logger.debug("Configuring IPv4")
70             interface.set_table_ip4(ipv4_table_id)
71             interface.config_ip4()
72             interface.resolve_arp()
73         interface.admin_up()
74
75     def setup_interfaces(self, ipv6=[], ipv4=[],
76                          ipv6_table_id=[], ipv4_table_id=[]):
77         """ Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(intf,
113                                      ipv6[i], ipv4[i],
114                                      ipv6_table_id[i], ipv4_table_id[i])
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip arp"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """ Unconfigure and bring down interface.
127         """
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     def test_SRv6_End_AS_IPv6_noSRH(self):
139         """ Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite.
140         """
141         self.run_SRv6_End_AS_IPv6(
142             sid_list=['a1::', 'a2::a6', 'a3::'],
143             test_sid_index=1,
144             rewrite_src_addr='a2::')
145
146     def test_SRv6_End_AS_IPv6_SRH(self):
147         """ Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite.
148         """
149         self.run_SRv6_End_AS_IPv6(
150             sid_list=['a1::a6', 'a2::', 'a3::'],
151             test_sid_index=0,
152             rewrite_src_addr='a1::')
153
154     def test_SRv6_End_AS_IPv4_noSRH(self):
155         """ Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite.
156         """
157         self.run_SRv6_End_AS_IPv4(
158             sid_list=['a1::', 'a2::a6', 'a3::'],
159             test_sid_index=1,
160             rewrite_src_addr='a2::')
161
162     def test_SRv6_End_AS_IPv4_SRH(self):
163         """ Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite.
164         """
165         self.run_SRv6_End_AS_IPv4(
166             sid_list=['a1::a6', 'a2::', 'a3::'],
167             test_sid_index=0,
168             rewrite_src_addr='a1::')
169
170     def test_SRv6_End_AS_L2_noSRH(self):
171         """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
172         """
173         self.run_SRv6_End_AS_L2(
174             sid_list=['a1::', 'a2::a6', 'a3::'],
175             test_sid_index=1,
176             rewrite_src_addr='a2::')
177
178     def test_SRv6_End_AS_L2_SRH(self):
179         """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
180         """
181         self.run_SRv6_End_AS_L2(
182             sid_list=['a1::a6', 'a2::', 'a3::'],
183             test_sid_index=0,
184             rewrite_src_addr='a1::')
185
186     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
187         """ Run SRv6 End.AS test with L2 traffic.
188         """
189         self.rewrite_src_addr = rewrite_src_addr
190         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
191
192         # send traffic to one destination interface
193         # source and destination interfaces are IPv6 only
194         self.setup_interfaces(ipv6=[True, False])
195
196         # configure route to next segment
197         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
198                            [VppRoutePath(self.pg0.remote_ip6,
199                                          self.pg0.sw_if_index,
200                                          proto=DpoProto.DPO_PROTO_IP6)],
201                            is_ip6=1)
202         route.add_vpp_config()
203
204         # configure SRv6 localSID behavior
205         cli_str = "sr localsid address " + sid_list[test_sid_index] \
206             + " behavior end.as" \
207             + " oif " + self.pg1.name \
208             + " iif " + self.pg1.name \
209             + " src " + self.rewrite_src_addr
210         for s in self.rewrite_sid_list:
211             cli_str += " next " + s
212         self.vapi.cli(cli_str)
213
214         # log the localsids
215         self.logger.debug(self.vapi.cli("show sr localsid"))
216
217         # send one packet per packet size
218         count = len(self.pg_packet_sizes)
219
220         # prepare L2 in SRv6 headers
221         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
222                         sidlist=sid_list[::-1],
223                         segleft=len(sid_list) - test_sid_index - 1,
224                         vlan=0)
225
226         # generate packets (pg0->pg1)
227         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
228                                    self.pg_packet_sizes, count)
229
230         # send packets and verify received packets
231         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
232                                   self.compare_rx_tx_packet_End_AS_L2_out)
233
234         # log the localsid counters
235         self.logger.info(self.vapi.cli("show sr localsid"))
236
237         # prepare L2 header for returning packets
238         packet_header2 = self.create_packet_header_L2()
239
240         # generate returning packets (pg1->pg0)
241         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
242                                    self.pg_packet_sizes, count)
243
244         # send packets and verify received packets
245         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
246                                   self.compare_rx_tx_packet_End_AS_L2_in)
247
248         # log the localsid counters
249         self.logger.info(self.vapi.cli("show sr localsid"))
250
251         # remove SRv6 localSIDs
252         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
253
254         # cleanup interfaces
255         self.teardown_interfaces()
256
257     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
258         """ Run SRv6 End.AS test with IPv6 traffic.
259         """
260         self.rewrite_src_addr = rewrite_src_addr
261         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
262
263         # send traffic to one destination interface
264         # source and destination interfaces are IPv6 only
265         self.setup_interfaces(ipv6=[True, True])
266
267         # configure route to next segment
268         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
269                            [VppRoutePath(self.pg0.remote_ip6,
270                                          self.pg0.sw_if_index,
271                                          proto=DpoProto.DPO_PROTO_IP6)],
272                            is_ip6=1)
273         route.add_vpp_config()
274
275         # configure SRv6 localSID behavior
276         cli_str = "sr localsid address " + sid_list[test_sid_index] \
277             + " behavior end.as" \
278             + " nh " + self.pg1.remote_ip6 \
279             + " oif " + self.pg1.name \
280             + " iif " + self.pg1.name \
281             + " src " + self.rewrite_src_addr
282         for s in self.rewrite_sid_list:
283             cli_str += " next " + s
284         self.vapi.cli(cli_str)
285
286         # log the localsids
287         self.logger.debug(self.vapi.cli("show sr localsid"))
288
289         # send one packet per packet size
290         count = len(self.pg_packet_sizes)
291
292         # prepare IPv6 in SRv6 headers
293         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
294                         sidlist=sid_list[::-1],
295                         segleft=len(sid_list) - test_sid_index - 1)
296
297         # generate packets (pg0->pg1)
298         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
299                                    self.pg_packet_sizes, count)
300
301         # send packets and verify received packets
302         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
303                                   self.compare_rx_tx_packet_End_AS_IPv6_out)
304
305         # log the localsid counters
306         self.logger.info(self.vapi.cli("show sr localsid"))
307
308         # prepare IPv6 header for returning packets
309         packet_header2 = self.create_packet_header_IPv6()
310
311         # generate returning packets (pg1->pg0)
312         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
313                                    self.pg_packet_sizes, count)
314
315         # send packets and verify received packets
316         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
317                                   self.compare_rx_tx_packet_End_AS_IPv6_in)
318
319         # log the localsid counters
320         self.logger.info(self.vapi.cli("show sr localsid"))
321
322         # remove SRv6 localSIDs
323         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
324
325         # cleanup interfaces
326         self.teardown_interfaces()
327
328     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
329         """ Run SRv6 End.AS test with IPv4 traffic.
330         """
331         self.rewrite_src_addr = rewrite_src_addr
332         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
333
334         # send traffic to one destination interface
335         # source and destination interfaces are IPv6 only
336         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
337
338         # configure route to next segment
339         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
340                            [VppRoutePath(self.pg0.remote_ip6,
341                                          self.pg0.sw_if_index,
342                                          proto=DpoProto.DPO_PROTO_IP6)],
343                            is_ip6=1)
344         route.add_vpp_config()
345
346         # configure SRv6 localSID behavior
347         cli_str = "sr localsid address " + sid_list[test_sid_index] \
348             + " behavior end.as" \
349             + " nh " + self.pg1.remote_ip4 \
350             + " oif " + self.pg1.name \
351             + " iif " + self.pg1.name \
352             + " src " + self.rewrite_src_addr
353         for s in self.rewrite_sid_list:
354             cli_str += " next " + s
355         self.vapi.cli(cli_str)
356
357         # log the localsids
358         self.logger.debug(self.vapi.cli("show sr localsid"))
359
360         # send one packet per packet size
361         count = len(self.pg_packet_sizes)
362
363         # prepare IPv4 in SRv6 headers
364         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
365                         sidlist=sid_list[::-1],
366                         segleft=len(sid_list) - test_sid_index - 1)
367
368         # generate packets (pg0->pg1)
369         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
370                                    self.pg_packet_sizes, count)
371
372         # send packets and verify received packets
373         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
374                                   self.compare_rx_tx_packet_End_AS_IPv4_out)
375
376         # log the localsid counters
377         self.logger.info(self.vapi.cli("show sr localsid"))
378
379         # prepare IPv6 header for returning packets
380         packet_header2 = self.create_packet_header_IPv4()
381
382         # generate returning packets (pg1->pg0)
383         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
384                                    self.pg_packet_sizes, count)
385
386         # send packets and verify received packets
387         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
388                                   self.compare_rx_tx_packet_End_AS_IPv4_in)
389
390         # log the localsid counters
391         self.logger.info(self.vapi.cli("show sr localsid"))
392
393         # remove SRv6 localSIDs
394         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
395
396         # cleanup interfaces
397         self.teardown_interfaces()
398
399     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
400         """ Compare input and output packet after passing End.AS
401
402         :param tx_pkt: transmitted packet
403         :param rx_pkt: received packet
404         """
405
406         # get first (outer) IPv6 header of rx'ed packet
407         rx_ip = rx_pkt.getlayer(IPv6)
408         rx_srh = None
409
410         tx_ip = tx_pkt.getlayer(IPv6)
411
412         # expected segment-list (SRH order)
413         tx_seglist = self.rewrite_sid_list[::-1]
414
415         # received ip.src should be equal to SR Policy source
416         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
417         # received ip.dst should be equal to expected sidlist[lastentry]
418         self.assertEqual(rx_ip.dst, tx_seglist[-1])
419
420         if len(tx_seglist) > 1:
421             # rx'ed packet should have SRH
422             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
423             # get SRH
424             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
425             # rx'ed seglist should be equal to expected seglist
426             self.assertEqual(rx_srh.addresses, tx_seglist)
427             # segleft should be equal to size expected seglist-1
428             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
429             # segleft should be equal to lastentry
430             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
431             # get payload
432             payload = rx_srh.payload
433         else:
434             # rx'ed packet should NOT have SRH
435             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
436             # get payload
437             payload = rx_ip.payload
438
439         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
440         # except for the hop-limit field
441         #   -> update tx'ed hlim to the expected hlim
442         tx_ip.hlim = tx_ip.hlim - 1
443
444         self.assertEqual(payload, tx_ip)
445
446         self.logger.debug("packet verification: SUCCESS")
447
448     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
449         """ Compare input and output packet after passing End.AS
450
451         :param tx_pkt: transmitted packet
452         :param rx_pkt: received packet
453         """
454
455         # get first (outer) IPv6 header of rx'ed packet
456         rx_ip = rx_pkt.getlayer(IPv6)
457         rx_srh = None
458
459         tx_ip = tx_pkt.getlayer(IP)
460
461         # expected segment-list (SRH order)
462         tx_seglist = self.rewrite_sid_list[::-1]
463
464         # received ip.src should be equal to SR Policy source
465         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
466         # received ip.dst should be equal to expected sidlist[lastentry]
467         self.assertEqual(rx_ip.dst, tx_seglist[-1])
468
469         if len(tx_seglist) > 1:
470             # rx'ed packet should have SRH and IPv4 header
471             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
472             self.assertTrue(rx_ip.payload.haslayer(IP))
473             # get SRH
474             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
475             # rx'ed seglist should be equal to seglist
476             self.assertEqual(rx_srh.addresses, tx_seglist)
477             # segleft should be equal to size seglist-1
478             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
479             # segleft should be equal to lastentry
480             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
481             payload = rx_srh.payload
482         else:
483             # rx'ed packet should NOT have SRH
484             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
485             # get payload
486             payload = rx_ip.payload
487
488         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
489         # except for the ttl field and ip checksum
490         #   -> adjust tx'ed ttl to expected ttl
491         tx_ip.ttl = tx_ip.ttl - 1
492         #   -> set tx'ed ip checksum to None and let scapy recompute
493         tx_ip.chksum = None
494         # read back the pkt (with str()) to force computing these fields
495         # probably other ways to accomplish this are possible
496         tx_ip = IP(scapy.compat.raw(tx_ip))
497
498         self.assertEqual(payload, tx_ip)
499
500         self.logger.debug("packet verification: SUCCESS")
501
502     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
503         """ Compare input and output packet after passing End.AS
504
505         :param tx_pkt: transmitted packet
506         :param rx_pkt: received packet
507         """
508
509         # get first (outer) IPv6 header of rx'ed packet
510         rx_ip = rx_pkt.getlayer(IPv6)
511         rx_srh = None
512
513         tx_ether = tx_pkt.getlayer(Ether)
514
515         # expected segment-list (SRH order)
516         tx_seglist = self.rewrite_sid_list[::-1]
517
518         # received ip.src should be equal to SR Policy source
519         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
520         # received ip.dst should be equal to expected sidlist[lastentry]
521         self.assertEqual(rx_ip.dst, tx_seglist[-1])
522
523         if len(tx_seglist) > 1:
524             # rx'ed packet should have SRH
525             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
526             # get SRH
527             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
528             # rx'ed seglist should be equal to seglist
529             self.assertEqual(rx_srh.addresses, tx_seglist)
530             # segleft should be equal to size seglist-1
531             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
532             # segleft should be equal to lastentry
533             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
534             # nh should be "No Next Header" (59)
535             self.assertEqual(rx_srh.nh, 59)
536             # get payload
537             payload = rx_srh.payload
538         else:
539             # rx'ed packet should NOT have SRH
540             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
541             # get payload
542             payload = rx_ip.payload
543
544         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
545         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
546
547         self.logger.debug("packet verification: SUCCESS")
548
549     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
550         """ Compare input and output packet after passing End.AS with IPv6
551
552         :param tx_pkt: transmitted packet
553         :param rx_pkt: received packet
554         """
555
556         # get first (outer) IPv6 header of rx'ed packet
557         rx_ip = rx_pkt.getlayer(IPv6)
558
559         tx_ip = tx_pkt.getlayer(IPv6)
560         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
561
562         # verify if rx'ed packet has no SRH
563         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
564
565         # the whole rx_ip pkt should be equal to tx_ip2
566         # except for the hlim field
567         #   -> adjust tx'ed hlim to expected hlim
568         tx_ip2.hlim = tx_ip2.hlim - 1
569
570         self.assertEqual(rx_ip, tx_ip2)
571
572         self.logger.debug("packet verification: SUCCESS")
573
574     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
575         """ Compare input and output packet after passing End.AS with IPv4
576
577         :param tx_pkt: transmitted packet
578         :param rx_pkt: received packet
579         """
580
581         # get IPv4 header of rx'ed packet
582         rx_ip = rx_pkt.getlayer(IP)
583
584         tx_ip = tx_pkt.getlayer(IPv6)
585         tx_ip2 = tx_pkt.getlayer(IP)
586
587         # verify if rx'ed packet has no SRH
588         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
589
590         # the whole rx_ip pkt should be equal to tx_ip2
591         # except for the ttl field and ip checksum
592         #   -> adjust tx'ed ttl to expected ttl
593         tx_ip2.ttl = tx_ip2.ttl - 1
594         #   -> set tx'ed ip checksum to None and let scapy recompute
595         tx_ip2.chksum = None
596         # read back the pkt (with str()) to force computing these fields
597         # probably other ways to accomplish this are possible
598         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
599
600         self.assertEqual(rx_ip, tx_ip2)
601
602         self.logger.debug("packet verification: SUCCESS")
603
604     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
605         """ Compare input and output packet after passing End.AS with L2
606
607         :param tx_pkt: transmitted packet
608         :param rx_pkt: received packet
609         """
610
611         # get IPv4 header of rx'ed packet
612         rx_eth = rx_pkt.getlayer(Ether)
613
614         tx_ip = tx_pkt.getlayer(IPv6)
615         # we can't just get the 2nd Ether layer
616         # get the Raw content and dissect it as Ether
617         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
618
619         # verify if rx'ed packet has no SRH
620         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
621
622         # the whole rx_eth pkt should be equal to tx_eth1
623         self.assertEqual(rx_eth, tx_eth1)
624
625         self.logger.debug("packet verification: SUCCESS")
626
627     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
628                       count):
629         """Create SRv6 input packet stream for defined interface.
630
631         :param VppInterface src_if: Interface to create packet stream for
632         :param VppInterface dst_if: destination interface of packet stream
633         :param packet_header: Layer3 scapy packet headers,
634                 L2 is added when not provided,
635                 Raw(payload) with packet_info is added
636         :param list packet_sizes: packet stream pckt sizes,sequentially applied
637                to packets in stream have
638         :param int count: number of packets in packet stream
639         :return: list of packets
640         """
641         self.logger.info("Creating packets")
642         pkts = []
643         for i in range(0, count-1):
644             payload_info = self.create_packet_info(src_if, dst_if)
645             self.logger.debug(
646                 "Creating packet with index %d" % (payload_info.index))
647             payload = self.info_to_payload(payload_info)
648             # add L2 header if not yet provided in packet_header
649             if packet_header.getlayer(0).name == 'Ethernet':
650                 p = (packet_header /
651                      Raw(payload))
652             else:
653                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
654                      packet_header /
655                      Raw(payload))
656             size = packet_sizes[i % len(packet_sizes)]
657             self.logger.debug("Packet size %d" % (size))
658             self.extend_packet(p, size)
659             # we need to store the packet with the automatic fields computed
660             # read back the dumped packet (with str())
661             # to force computing these fields
662             # probably other ways are possible
663             p = Ether(scapy.compat.raw(p))
664             payload_info.data = p.copy()
665             self.logger.debug(ppp("Created packet:", p))
666             pkts.append(p)
667         self.logger.info("Done creating packets")
668         return pkts
669
670     def send_and_verify_pkts(self, input, pkts, output, compare_func):
671         """Send packets and verify received packets using compare_func
672
673         :param input: ingress interface of DUT
674         :param pkts: list of packets to transmit
675         :param output: egress interface of DUT
676         :param compare_func: function to compare in and out packets
677         """
678         # add traffic stream to input interface
679         input.add_stream(pkts)
680
681         # enable capture on all interfaces
682         self.pg_enable_capture(self.pg_interfaces)
683
684         # start traffic
685         self.logger.info("Starting traffic")
686         self.pg_start()
687
688         # get output capture
689         self.logger.info("Getting packet capture")
690         capture = output.get_capture()
691
692         # assert nothing was captured on input interface
693         # input.assert_nothing_captured()
694
695         # verify captured packets
696         self.verify_captured_pkts(output, capture, compare_func)
697
698     def create_packet_header_IPv6(self):
699         """Create packet header: IPv6 header, UDP header
700
701         :param dst: IPv6 destination address
702
703         IPv6 source address is 1234::1
704         IPv6 destination address is 4321::1
705         UDP source port and destination port are 1234
706         """
707
708         p = (IPv6(src='1234::1', dst='4321::1') /
709              UDP(sport=1234, dport=1234))
710         return p
711
712     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
713         """Create packet header: IPv6 encapsulated in SRv6:
714         IPv6 header with SRH, IPv6 header, UDP header
715
716         :param list sidlist: segment list of outer IPv6 SRH
717         :param int segleft: segments-left field of outer IPv6 SRH
718
719         Outer IPv6 source address is set to 5678::1
720         Outer IPv6 destination address is set to sidlist[segleft]
721         IPv6 source addresses is 1234::1
722         IPv6 destination address is 4321::1
723         UDP source port and destination port are 1234
724         """
725
726         p = (IPv6(src='5678::1', dst=sidlist[segleft]) /
727              IPv6ExtHdrSegmentRouting(addresses=sidlist,
728                                       segleft=segleft, nh=41) /
729              IPv6(src='1234::1', dst='4321::1') /
730              UDP(sport=1234, dport=1234))
731         return p
732
733     def create_packet_header_IPv4(self):
734         """Create packet header: IPv4 header, UDP header
735
736         :param dst: IPv4 destination address
737
738         IPv4 source address is 123.1.1.1
739         IPv4 destination address is 124.1.1.1
740         UDP source port and destination port are 1234
741         """
742
743         p = (IP(src='123.1.1.1', dst='124.1.1.1') /
744              UDP(sport=1234, dport=1234))
745         return p
746
747     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
748         """Create packet header: IPv4 encapsulated in SRv6:
749         IPv6 header with SRH, IPv4 header, UDP header
750
751         :param ipv4address dst: inner IPv4 destination address
752         :param list sidlist: segment list of outer IPv6 SRH
753         :param int segleft: segments-left field of outer IPv6 SRH
754
755         Outer IPv6 destination address is set to sidlist[segleft]
756         IPv6 source address is 1234::1
757         IPv4 source address is 123.1.1.1
758         IPv4 destination address is 124.1.1.1
759         UDP source port and destination port are 1234
760         """
761
762         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
763              IPv6ExtHdrSegmentRouting(addresses=sidlist,
764                                       segleft=segleft, nh=4) /
765              IP(src='123.1.1.1', dst='124.1.1.1') /
766              UDP(sport=1234, dport=1234))
767         return p
768
769     def create_packet_header_L2(self, vlan=0):
770         """Create packet header: L2 header
771
772         :param vlan: if vlan!=0 then add 802.1q header
773         """
774         # Note: the dst addr ('00:55:44:33:22:11') is used in
775         # the compare function compare_rx_tx_packet_T_Encaps_L2
776         # to detect presence of L2 in SRH payload
777         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
778         etype = 0x8137  # IPX
779         if vlan:
780             # add 802.1q layer
781             p /= Dot1Q(vlan=vlan, type=etype)
782         else:
783             p.type = etype
784         return p
785
786     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
787         """Create packet header: L2 encapsulated in SRv6:
788         IPv6 header with SRH, L2
789
790         :param list sidlist: segment list of outer IPv6 SRH
791         :param int segleft: segments-left field of outer IPv6 SRH
792         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
793
794         Outer IPv6 destination address is set to sidlist[segleft]
795         IPv6 source address is 1234::1
796         """
797         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
798         etype = 0x8137  # IPX
799         if vlan:
800             # add 802.1q layer
801             eth /= Dot1Q(vlan=vlan, type=etype)
802         else:
803             eth.type = etype
804
805         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
806              IPv6ExtHdrSegmentRouting(addresses=sidlist,
807                                       segleft=segleft, nh=59) /
808              eth)
809         return p
810
811     def get_payload_info(self, packet):
812         """ Extract the payload_info from the packet
813         """
814         # in most cases, payload_info is in packet[Raw]
815         # but packet[Raw] gives the complete payload
816         # (incl L2 header) for the T.Encaps L2 case
817         try:
818             payload_info = self.payload_to_info(packet[Raw])
819
820         except:
821             # remote L2 header from packet[Raw]:
822             # take packet[Raw], convert it to an Ether layer
823             # and then extract Raw from it
824             payload_info = self.payload_to_info(
825                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
826
827         return payload_info
828
829     def verify_captured_pkts(self, dst_if, capture, compare_func):
830         """
831         Verify captured packet stream for specified interface.
832         Compare ingress with egress packets using the specified compare fn
833
834         :param dst_if: egress interface of DUT
835         :param capture: captured packets
836         :param compare_func: function to compare in and out packet
837         """
838         self.logger.info("Verifying capture on interface %s using function %s"
839                          % (dst_if.name, compare_func.__name__))
840
841         last_info = dict()
842         for i in self.pg_interfaces:
843             last_info[i.sw_if_index] = None
844         dst_sw_if_index = dst_if.sw_if_index
845
846         for packet in capture:
847             try:
848                 # extract payload_info from packet's payload
849                 payload_info = self.get_payload_info(packet)
850                 packet_index = payload_info.index
851
852                 self.logger.debug("Verifying packet with index %d"
853                                   % (packet_index))
854                 # packet should have arrived on the expected interface
855                 self.assertEqual(payload_info.dst, dst_sw_if_index)
856                 self.logger.debug(
857                     "Got packet on interface %s: src=%u (idx=%u)" %
858                     (dst_if.name, payload_info.src, packet_index))
859
860                 # search for payload_info with same src and dst if_index
861                 # this will give us the transmitted packet
862                 next_info = self.get_next_packet_info_for_interface2(
863                     payload_info.src, dst_sw_if_index,
864                     last_info[payload_info.src])
865                 last_info[payload_info.src] = next_info
866                 # next_info should not be None
867                 self.assertTrue(next_info is not None)
868                 # index of tx and rx packets should be equal
869                 self.assertEqual(packet_index, next_info.index)
870                 # data field of next_info contains the tx packet
871                 txed_packet = next_info.data
872
873                 self.logger.debug(ppp("Transmitted packet:",
874                                       txed_packet))  # ppp=Pretty Print Packet
875
876                 self.logger.debug(ppp("Received packet:", packet))
877
878                 # compare rcvd packet with expected packet using compare_func
879                 compare_func(txed_packet, packet)
880
881             except:
882                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
883                 raise
884
885         # have all expected packets arrived?
886         for i in self.pg_interfaces:
887             remaining_packet = self.get_next_packet_info_for_interface2(
888                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
889             self.assertTrue(remaining_packet is None,
890                             "Interface %s: Packet expected from interface %s "
891                             "didn't arrive" % (dst_if.name, i.name))
892
893
894 if __name__ == '__main__':
895     unittest.main(testRunner=VppTestRunner)