5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14 from scapy.layers.inet import IP, UDP
19 class TestSRv6AdFlow(VppTestCase):
20 """SRv6 Flow-based Dynamic Proxy plugin Test Case"""
24 super(TestSRv6AdFlow, self).setUpClass()
27 def tearDownClass(cls):
28 super(TestSRv6AdFlow, cls).tearDownClass()
31 """Perform test setup before each test case."""
32 super(TestSRv6AdFlow, self).setUp()
34 # packet sizes, inclusive L2 overhead
35 self.pg_packet_sizes = [64, 512, 1518, 9018]
38 self.reset_packet_infos()
41 """Clean up test setup after each test case."""
42 self.teardown_interfaces()
44 super(TestSRv6AdFlow, self).tearDown()
46 def configure_interface(
47 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
49 """Configure interface.
50 :param ipv6: configure IPv6 on interface
51 :param ipv4: configure IPv4 on interface
52 :param ipv6_table_id: FIB table_id for IPv6
53 :param ipv4_table_id: FIB table_id for IPv4
55 self.logger.debug("Configuring interface %s" % (interface.name))
57 self.logger.debug("Configuring IPv6")
58 interface.set_table_ip6(ipv6_table_id)
59 interface.config_ip6()
60 interface.resolve_ndp(timeout=5)
62 self.logger.debug("Configuring IPv4")
63 interface.set_table_ip4(ipv4_table_id)
64 interface.config_ip4()
65 interface.resolve_arp()
68 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
69 """Create and configure interfaces.
71 :param ipv6: list of interface IPv6 capabilities
72 :param ipv4: list of interface IPv4 capabilities
73 :param ipv6_table_id: list of intf IPv6 FIB table_ids
74 :param ipv4_table_id: list of intf IPv4 FIB table_ids
75 :returns: List of created interfaces.
77 # how many interfaces?
82 self.logger.debug("Creating and configuring %d interfaces" % (count))
84 # fill up ipv6 and ipv4 lists if needed
85 # not enabled (False) is the default
87 ipv6 += (count - len(ipv6)) * [False]
89 ipv4 += (count - len(ipv4)) * [False]
91 # fill up table_id lists if needed
92 # table_id 0 (global) is the default
93 if len(ipv6_table_id) < count:
94 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
95 if len(ipv4_table_id) < count:
96 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
98 # create 'count' pg interfaces
99 self.create_pg_interfaces(range(count))
101 # setup all interfaces
102 for i in range(count):
103 intf = self.pg_interfaces[i]
104 self.configure_interface(
105 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
109 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
111 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
112 self.logger.debug(self.vapi.cli("show interface"))
113 self.logger.debug(self.vapi.cli("show hardware"))
115 return self.pg_interfaces
117 def teardown_interfaces(self):
118 """Unconfigure and bring down interface."""
119 self.logger.debug("Tearing down interfaces")
120 # tear down all interfaces
121 # AFAIK they cannot be deleted
122 for i in self.pg_interfaces:
123 self.logger.debug("Tear down interface %s" % (i.name))
129 def test_SRv6_End_AD_IPv6(self):
130 """Test SRv6 End.AD behavior with IPv6 traffic."""
131 self.src_addr = "a0::"
132 self.sid_list = ["a1::", "a2::a6", "a3::"]
133 self.test_sid_index = 1
135 # send traffic to one destination interface
136 # source and destination interfaces are IPv6 only
137 self.setup_interfaces(ipv6=[True, True])
139 # configure route to next segment
142 self.sid_list[self.test_sid_index + 1],
147 self.pg0.sw_if_index,
148 proto=DpoProto.DPO_PROTO_IP6,
152 route.add_vpp_config()
154 # configure SRv6 localSID behavior
156 "sr localsid address "
157 + self.sid_list[self.test_sid_index]
158 + " behavior end.ad.flow"
160 + self.pg1.remote_ip6
166 self.vapi.cli(cli_str)
169 self.logger.debug(self.vapi.cli("show sr localsid"))
171 # send one packet per packet size
172 count = len(self.pg_packet_sizes)
174 # prepare IPv6 in SRv6 headers
175 packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
176 srcaddr=self.src_addr,
177 sidlist=self.sid_list[::-1],
178 segleft=len(self.sid_list) - self.test_sid_index - 1,
181 # generate packets (pg0->pg1)
182 pkts1 = self.create_stream(
183 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
186 # send packets and verify received packets
187 self.send_and_verify_pkts(
188 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv6_out
191 # log the localsid counters
192 self.logger.info(self.vapi.cli("show sr localsid"))
194 # prepare IPv6 header for returning packets
195 packet_header2 = self.create_packet_header_IPv6()
197 # generate returning packets (pg1->pg0)
198 pkts2 = self.create_stream(
199 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
202 # send packets and verify received packets
203 self.send_and_verify_pkts(
204 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv6_in
207 # log the localsid counters
208 self.logger.info(self.vapi.cli("show sr localsid"))
210 # remove SRv6 localSIDs
211 cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
212 self.vapi.cli(cli_str)
215 self.teardown_interfaces()
217 def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
218 """Compare input and output packet after passing End.AD with IPv6
220 :param tx_pkt: transmitted packet
221 :param rx_pkt: received packet
224 # get first (outer) IPv6 header of rx'ed packet
225 rx_ip = rx_pkt.getlayer(IPv6)
227 tx_ip = tx_pkt.getlayer(IPv6)
228 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
230 # verify if rx'ed packet has no SRH
231 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
233 # the whole rx_ip pkt should be equal to tx_ip2
234 # except for the hlim field
235 # -> adjust tx'ed hlim to expected hlim
236 tx_ip2.hlim = tx_ip2.hlim - 1
238 self.assertEqual(rx_ip, tx_ip2)
240 self.logger.debug("packet verification: SUCCESS")
242 def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
243 """Compare input and output packet after passing End.AD
245 :param tx_pkt: transmitted packet
246 :param rx_pkt: received packet
249 # get first (outer) IPv6 header of rx'ed packet
250 rx_ip = rx_pkt.getlayer(IPv6)
251 # received ip.src should be equal to SR Policy source
252 self.assertEqual(rx_ip.src, self.src_addr)
253 # received ip.dst should be equal to expected sidlist next segment
254 self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
256 # rx'ed packet should have SRH
257 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
260 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
261 # rx'ed seglist should be equal to SID-list in reversed order
262 self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
263 # segleft should be equal to previous segleft value minus 1
264 self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
265 # lastentry should be equal to the SID-list length minus 1
266 self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
268 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
269 # except for the hop-limit field
270 tx_ip = tx_pkt.getlayer(IPv6)
271 # -> update tx'ed hlim to the expected hlim
274 self.assertEqual(rx_srh.payload, tx_ip)
276 self.logger.debug("packet verification: SUCCESS")
278 def test_SRv6_End_AD_IPv4(self):
279 """Test SRv6 End.AD behavior with IPv4 traffic."""
280 self.src_addr = "a0::"
281 self.sid_list = ["a1::", "a2::a4", "a3::"]
282 self.test_sid_index = 1
284 # send traffic to one destination interface
285 # source and destination interfaces are IPv6 only
286 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
288 # configure route to next segment
291 self.sid_list[self.test_sid_index + 1],
296 self.pg0.sw_if_index,
297 proto=DpoProto.DPO_PROTO_IP6,
301 route.add_vpp_config()
303 # configure SRv6 localSID behavior
305 "sr localsid address "
306 + self.sid_list[self.test_sid_index]
307 + " behavior end.ad.flow"
309 + self.pg1.remote_ip4
315 self.vapi.cli(cli_str)
318 self.logger.debug(self.vapi.cli("show sr localsid"))
320 # send one packet per packet size
321 count = len(self.pg_packet_sizes)
323 # prepare IPv4 in SRv6 headers
324 packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
325 srcaddr=self.src_addr,
326 sidlist=self.sid_list[::-1],
327 segleft=len(self.sid_list) - self.test_sid_index - 1,
330 # generate packets (pg0->pg1)
331 pkts1 = self.create_stream(
332 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
335 # send packets and verify received packets
336 self.send_and_verify_pkts(
337 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv4_out
340 # log the localsid counters
341 self.logger.info(self.vapi.cli("show sr localsid"))
343 # prepare IPv6 header for returning packets
344 packet_header2 = self.create_packet_header_IPv4()
346 # generate returning packets (pg1->pg0)
347 pkts2 = self.create_stream(
348 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
351 # send packets and verify received packets
352 self.send_and_verify_pkts(
353 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv4_in
356 # log the localsid counters
357 self.logger.info(self.vapi.cli("show sr localsid"))
359 # remove SRv6 localSIDs
360 cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
361 self.vapi.cli(cli_str)
364 self.teardown_interfaces()
366 def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
367 """Compare input and output packet after passing End.AD with IPv4
369 :param tx_pkt: transmitted packet
370 :param rx_pkt: received packet
373 # get IPv4 header of rx'ed packet
374 rx_ip = rx_pkt.getlayer(IP)
376 tx_ip = tx_pkt.getlayer(IPv6)
377 tx_ip2 = tx_pkt.getlayer(IP)
379 # verify if rx'ed packet has no SRH
380 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
382 # the whole rx_ip pkt should be equal to tx_ip2
383 # except for the ttl field and ip checksum
384 # -> adjust tx'ed ttl to expected ttl
385 tx_ip2.ttl = tx_ip2.ttl - 1
386 # -> set tx'ed ip checksum to None and let scapy recompute
388 # read back the pkt (with str()) to force computing these fields
389 # probably other ways to accomplish this are possible
390 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
392 self.assertEqual(rx_ip, tx_ip2)
394 self.logger.debug("packet verification: SUCCESS")
396 def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
397 """Compare input and output packet after passing End.AD
399 :param tx_pkt: transmitted packet
400 :param rx_pkt: received packet
403 # get first (outer) IPv6 header of rx'ed packet
404 rx_ip = rx_pkt.getlayer(IPv6)
405 # received ip.src should be equal to SR Policy source
406 self.assertEqual(rx_ip.src, self.src_addr)
407 # received ip.dst should be equal to expected sidlist next segment
408 self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
410 # rx'ed packet should have SRH
411 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
414 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
415 # rx'ed seglist should be equal to SID-list in reversed order
416 self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
417 # segleft should be equal to previous segleft value minus 1
418 self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
419 # lastentry should be equal to the SID-list length minus 1
420 self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
422 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
423 # except for the ttl field and ip checksum
424 tx_ip = tx_pkt.getlayer(IP)
425 # -> adjust tx'ed ttl to expected ttl
426 tx_ip.ttl = tx_ip.ttl - 1
427 # -> set tx'ed ip checksum to None and let scapy recompute
429 # -> read back the pkt (with str()) to force computing these fields
430 # probably other ways to accomplish this are possible
431 self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
433 self.logger.debug("packet verification: SUCCESS")
435 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
436 """Create SRv6 input packet stream for defined interface.
438 :param VppInterface src_if: Interface to create packet stream for
439 :param VppInterface dst_if: destination interface of packet stream
440 :param packet_header: Layer3 scapy packet headers,
441 L2 is added when not provided,
442 Raw(payload) with packet_info is added
443 :param list packet_sizes: packet stream pckt sizes,sequentially applied
444 to packets in stream have
445 :param int count: number of packets in packet stream
446 :return: list of packets
448 self.logger.info("Creating packets")
450 for i in range(0, count - 1):
451 payload_info = self.create_packet_info(src_if, dst_if)
452 self.logger.debug("Creating packet with index %d" % (payload_info.index))
453 payload = self.info_to_payload(payload_info)
454 # add L2 header if not yet provided in packet_header
455 if packet_header.getlayer(0).name == "Ethernet":
456 p = packet_header / Raw(payload)
459 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
463 size = packet_sizes[i % len(packet_sizes)]
464 self.logger.debug("Packet size %d" % (size))
465 self.extend_packet(p, size)
466 # we need to store the packet with the automatic fields computed
467 # read back the dumped packet (with str())
468 # to force computing these fields
469 # probably other ways are possible
470 p = Ether(scapy.compat.raw(p))
471 payload_info.data = p.copy()
472 self.logger.debug(ppp("Created packet:", p))
474 self.logger.info("Done creating packets")
477 def send_and_verify_pkts(self, input, pkts, output, compare_func):
478 """Send packets and verify received packets using compare_func
480 :param input: ingress interface of DUT
481 :param pkts: list of packets to transmit
482 :param output: egress interface of DUT
483 :param compare_func: function to compare in and out packets
485 # add traffic stream to input interface
486 input.add_stream(pkts)
488 # enable capture on all interfaces
489 self.pg_enable_capture(self.pg_interfaces)
492 self.logger.info("Starting traffic")
496 self.logger.info("Getting packet capture")
497 capture = output.get_capture()
499 # assert nothing was captured on input interface
500 # input.assert_nothing_captured()
502 # verify captured packets
503 self.verify_captured_pkts(output, capture, compare_func)
505 def create_packet_header_IPv6(
506 self, saddr="1234::1", daddr="4321::1", sport=1234, dport=1234
508 """Create packet header: IPv6 header, UDP header
510 :param dst: IPv6 destination address
512 IPv6 source address is 1234::1
513 IPv6 destination address is 4321::1
514 UDP source port and destination port are 1234
517 p = IPv6(src=saddr, dst=daddr) / UDP(sport=sport, dport=dport)
520 def create_packet_header_IPv6_SRH_IPv6(
530 """Create packet header: IPv6 encapsulated in SRv6:
531 IPv6 header with SRH, IPv6 header, UDP header
533 :param int srcaddr: outer source address
534 :param list sidlist: segment list of outer IPv6 SRH
535 :param int segleft: segments-left field of outer IPv6 SRH
537 Outer IPv6 source address is set to srcaddr
538 Outer IPv6 destination address is set to sidlist[segleft]
539 Inner IPv6 source addresses is 1234::1
540 Inner IPv6 destination address is 4321::1
541 UDP source port and destination port are 1234
545 IPv6(src=srcaddr, dst=sidlist[segleft])
546 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
547 / IPv6(src=insrc, dst=indst)
548 / UDP(sport=sport, dport=dport)
552 def create_packet_header_IPv4(self):
553 """Create packet header: IPv4 header, UDP header
555 :param dst: IPv4 destination address
557 IPv4 source address is 123.1.1.1
558 IPv4 destination address is 124.1.1.1
559 UDP source port and destination port are 1234
562 p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
565 def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
566 """Create packet header: IPv4 encapsulated in SRv6:
567 IPv6 header with SRH, IPv4 header, UDP header
569 :param int srcaddr: outer source address
570 :param list sidlist: segment list of outer IPv6 SRH
571 :param int segleft: segments-left field of outer IPv6 SRH
573 Outer IPv6 source address is set to srcaddr
574 Outer IPv6 destination address is set to sidlist[segleft]
575 Inner IPv4 source address is 123.1.1.1
576 Inner IPv4 destination address is 124.1.1.1
577 UDP source port and destination port are 1234
581 IPv6(src=srcaddr, dst=sidlist[segleft])
582 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
583 / IP(src="123.1.1.1", dst="124.1.1.1")
584 / UDP(sport=1234, dport=1234)
588 def get_payload_info(self, packet):
589 """Extract the payload_info from the packet"""
590 # in most cases, payload_info is in packet[Raw]
591 # but packet[Raw] gives the complete payload
592 # (incl L2 header) for the T.Encaps L2 case
594 payload_info = self.payload_to_info(packet[Raw])
597 # remote L2 header from packet[Raw]:
598 # take packet[Raw], convert it to an Ether layer
599 # and then extract Raw from it
600 payload_info = self.payload_to_info(
601 Ether(scapy.compat.raw(packet[Raw]))[Raw]
606 def verify_captured_pkts(self, dst_if, capture, compare_func):
608 Verify captured packet stream for specified interface.
609 Compare ingress with egress packets using the specified compare fn
611 :param dst_if: egress interface of DUT
612 :param capture: captured packets
613 :param compare_func: function to compare in and out packet
616 "Verifying capture on interface %s using function %s"
617 % (dst_if.name, compare_func.__name__)
621 for i in self.pg_interfaces:
622 last_info[i.sw_if_index] = None
623 dst_sw_if_index = dst_if.sw_if_index
625 for packet in capture:
627 # extract payload_info from packet's payload
628 payload_info = self.get_payload_info(packet)
629 packet_index = payload_info.index
631 self.logger.debug("Verifying packet with index %d" % (packet_index))
632 # packet should have arrived on the expected interface
633 self.assertEqual(payload_info.dst, dst_sw_if_index)
635 "Got packet on interface %s: src=%u (idx=%u)"
636 % (dst_if.name, payload_info.src, packet_index)
639 # search for payload_info with same src and dst if_index
640 # this will give us the transmitted packet
641 next_info = self.get_next_packet_info_for_interface2(
642 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
644 last_info[payload_info.src] = next_info
645 # next_info should not be None
646 self.assertTrue(next_info is not None)
647 # index of tx and rx packets should be equal
648 self.assertEqual(packet_index, next_info.index)
649 # data field of next_info contains the tx packet
650 txed_packet = next_info.data
653 ppp("Transmitted packet:", txed_packet)
654 ) # ppp=Pretty Print Packet
656 self.logger.debug(ppp("Received packet:", packet))
658 # compare rcvd packet with expected packet using compare_func
659 compare_func(txed_packet, packet)
662 self.logger.error(ppp("Unexpected or invalid packet:", packet))
665 # have all expected packets arrived?
666 for i in self.pg_interfaces:
667 remaining_packet = self.get_next_packet_info_for_interface2(
668 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
671 remaining_packet is None,
672 "Interface %s: Packet expected from interface %s "
673 "didn't arrive" % (dst_if.name, i.name),
677 if __name__ == "__main__":
678 unittest.main(testRunner=VppTestRunner)