9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
753 class TestNAT44(MethodHolder):
754 """ NAT44 Test Cases """
758 super(TestNAT44, cls).setUpClass()
761 cls.tcp_port_in = 6303
762 cls.tcp_port_out = 6303
763 cls.udp_port_in = 6304
764 cls.udp_port_out = 6304
765 cls.icmp_id_in = 6305
766 cls.icmp_id_out = 6305
767 cls.nat_addr = '10.0.0.3'
768 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
769 cls.ipfix_src_port = 4739
770 cls.ipfix_domain_id = 1
772 cls.create_pg_interfaces(range(10))
773 cls.interfaces = list(cls.pg_interfaces[0:4])
775 for i in cls.interfaces:
780 cls.pg0.generate_remote_hosts(3)
781 cls.pg0.configure_ipv4_neighbors()
783 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
784 cls.vapi.ip_table_add_del(10, is_add=1)
785 cls.vapi.ip_table_add_del(20, is_add=1)
787 cls.pg4._local_ip4 = "172.16.255.1"
788 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
789 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
790 cls.pg4.set_table_ip4(10)
791 cls.pg5._local_ip4 = "172.17.255.3"
792 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
793 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
794 cls.pg5.set_table_ip4(10)
795 cls.pg6._local_ip4 = "172.16.255.1"
796 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
797 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
798 cls.pg6.set_table_ip4(20)
799 for i in cls.overlapping_interfaces:
807 cls.pg9.generate_remote_hosts(2)
809 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
810 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
814 cls.pg9.resolve_arp()
815 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
816 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
817 cls.pg9.resolve_arp()
820 super(TestNAT44, cls).tearDownClass()
823 def clear_nat44(self):
825 Clear NAT44 configuration.
827 # I found no elegant way to do this
828 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
829 dst_address_length=32,
830 next_hop_address=self.pg7.remote_ip4n,
831 next_hop_sw_if_index=self.pg7.sw_if_index,
833 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
834 dst_address_length=32,
835 next_hop_address=self.pg8.remote_ip4n,
836 next_hop_sw_if_index=self.pg8.sw_if_index,
839 for intf in [self.pg7, self.pg8]:
840 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
842 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
847 if self.pg7.has_ip4_config:
848 self.pg7.unconfig_ip4()
850 interfaces = self.vapi.nat44_interface_addr_dump()
851 for intf in interfaces:
852 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
853 twice_nat=intf.twice_nat,
856 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
857 domain_id=self.ipfix_domain_id)
858 self.ipfix_src_port = 4739
859 self.ipfix_domain_id = 1
861 interfaces = self.vapi.nat44_interface_dump()
862 for intf in interfaces:
863 if intf.is_inside > 1:
864 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
867 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
871 interfaces = self.vapi.nat44_interface_output_feature_dump()
872 for intf in interfaces:
873 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
877 static_mappings = self.vapi.nat44_static_mapping_dump()
878 for sm in static_mappings:
879 self.vapi.nat44_add_del_static_mapping(
881 sm.external_ip_address,
882 local_port=sm.local_port,
883 external_port=sm.external_port,
884 addr_only=sm.addr_only,
886 protocol=sm.protocol,
887 twice_nat=sm.twice_nat,
890 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
891 for lb_sm in lb_static_mappings:
892 self.vapi.nat44_add_del_lb_static_mapping(
897 twice_nat=lb_sm.twice_nat,
902 identity_mappings = self.vapi.nat44_identity_mapping_dump()
903 for id_m in identity_mappings:
904 self.vapi.nat44_add_del_identity_mapping(
905 addr_only=id_m.addr_only,
908 sw_if_index=id_m.sw_if_index,
910 protocol=id_m.protocol,
913 adresses = self.vapi.nat44_address_dump()
914 for addr in adresses:
915 self.vapi.nat44_add_del_address_range(addr.ip_address,
917 twice_nat=addr.twice_nat,
920 self.vapi.nat_set_reass()
921 self.vapi.nat_set_reass(is_ip6=1)
923 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
924 local_port=0, external_port=0, vrf_id=0,
925 is_add=1, external_sw_if_index=0xFFFFFFFF,
926 proto=0, twice_nat=0):
928 Add/delete NAT44 static mapping
930 :param local_ip: Local IP address
931 :param external_ip: External IP address
932 :param local_port: Local port number (Optional)
933 :param external_port: External port number (Optional)
934 :param vrf_id: VRF ID (Default 0)
935 :param is_add: 1 if add, 0 if delete (Default add)
936 :param external_sw_if_index: External interface instead of IP address
937 :param proto: IP protocol (Mandatory if port specified)
938 :param twice_nat: 1 if translate external host address and port
941 if local_port and external_port:
943 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
944 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
945 self.vapi.nat44_add_del_static_mapping(
948 external_sw_if_index,
957 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
959 Add/delete NAT44 address
961 :param ip: IP address
962 :param is_add: 1 if add, 0 if delete (Default add)
963 :param twice_nat: twice NAT address for extenal hosts
965 nat_addr = socket.inet_pton(socket.AF_INET, ip)
966 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
970 def test_dynamic(self):
971 """ NAT44 dynamic translation test """
973 self.nat44_add_address(self.nat_addr)
974 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
975 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
979 pkts = self.create_stream_in(self.pg0, self.pg1)
980 self.pg0.add_stream(pkts)
981 self.pg_enable_capture(self.pg_interfaces)
983 capture = self.pg1.get_capture(len(pkts))
984 self.verify_capture_out(capture)
987 pkts = self.create_stream_out(self.pg1)
988 self.pg1.add_stream(pkts)
989 self.pg_enable_capture(self.pg_interfaces)
991 capture = self.pg0.get_capture(len(pkts))
992 self.verify_capture_in(capture, self.pg0)
994 def test_dynamic_icmp_errors_in2out_ttl_1(self):
995 """ NAT44 handling of client packets with TTL=1 """
997 self.nat44_add_address(self.nat_addr)
998 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
999 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1002 # Client side - generate traffic
1003 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1004 self.pg0.add_stream(pkts)
1005 self.pg_enable_capture(self.pg_interfaces)
1008 # Client side - verify ICMP type 11 packets
1009 capture = self.pg0.get_capture(len(pkts))
1010 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1012 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1013 """ NAT44 handling of server packets with TTL=1 """
1015 self.nat44_add_address(self.nat_addr)
1016 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1017 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1020 # Client side - create sessions
1021 pkts = self.create_stream_in(self.pg0, self.pg1)
1022 self.pg0.add_stream(pkts)
1023 self.pg_enable_capture(self.pg_interfaces)
1026 # Server side - generate traffic
1027 capture = self.pg1.get_capture(len(pkts))
1028 self.verify_capture_out(capture)
1029 pkts = self.create_stream_out(self.pg1, ttl=1)
1030 self.pg1.add_stream(pkts)
1031 self.pg_enable_capture(self.pg_interfaces)
1034 # Server side - verify ICMP type 11 packets
1035 capture = self.pg1.get_capture(len(pkts))
1036 self.verify_capture_out_with_icmp_errors(capture,
1037 src_ip=self.pg1.local_ip4)
1039 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1040 """ NAT44 handling of error responses to client packets with TTL=2 """
1042 self.nat44_add_address(self.nat_addr)
1043 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1044 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1047 # Client side - generate traffic
1048 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1049 self.pg0.add_stream(pkts)
1050 self.pg_enable_capture(self.pg_interfaces)
1053 # Server side - simulate ICMP type 11 response
1054 capture = self.pg1.get_capture(len(pkts))
1055 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1056 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1057 ICMP(type=11) / packet[IP] for packet in capture]
1058 self.pg1.add_stream(pkts)
1059 self.pg_enable_capture(self.pg_interfaces)
1062 # Client side - verify ICMP type 11 packets
1063 capture = self.pg0.get_capture(len(pkts))
1064 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1066 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1067 """ NAT44 handling of error responses to server packets with TTL=2 """
1069 self.nat44_add_address(self.nat_addr)
1070 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1071 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1074 # Client side - create sessions
1075 pkts = self.create_stream_in(self.pg0, self.pg1)
1076 self.pg0.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1080 # Server side - generate traffic
1081 capture = self.pg1.get_capture(len(pkts))
1082 self.verify_capture_out(capture)
1083 pkts = self.create_stream_out(self.pg1, ttl=2)
1084 self.pg1.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1088 # Client side - simulate ICMP type 11 response
1089 capture = self.pg0.get_capture(len(pkts))
1090 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1091 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1092 ICMP(type=11) / packet[IP] for packet in capture]
1093 self.pg0.add_stream(pkts)
1094 self.pg_enable_capture(self.pg_interfaces)
1097 # Server side - verify ICMP type 11 packets
1098 capture = self.pg1.get_capture(len(pkts))
1099 self.verify_capture_out_with_icmp_errors(capture)
1101 def test_ping_out_interface_from_outside(self):
1102 """ Ping NAT44 out interface from outside network """
1104 self.nat44_add_address(self.nat_addr)
1105 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1106 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1109 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1110 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1111 ICMP(id=self.icmp_id_out, type='echo-request'))
1113 self.pg1.add_stream(pkts)
1114 self.pg_enable_capture(self.pg_interfaces)
1116 capture = self.pg1.get_capture(len(pkts))
1117 self.assertEqual(1, len(capture))
1120 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1121 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1122 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1123 self.assertEqual(packet[ICMP].type, 0) # echo reply
1125 self.logger.error(ppp("Unexpected or invalid packet "
1126 "(outside network):", packet))
1129 def test_ping_internal_host_from_outside(self):
1130 """ Ping internal host from outside network """
1132 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1133 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1134 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1138 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1139 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1140 ICMP(id=self.icmp_id_out, type='echo-request'))
1141 self.pg1.add_stream(pkt)
1142 self.pg_enable_capture(self.pg_interfaces)
1144 capture = self.pg0.get_capture(1)
1145 self.verify_capture_in(capture, self.pg0, packet_num=1)
1146 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1149 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1150 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1151 ICMP(id=self.icmp_id_in, type='echo-reply'))
1152 self.pg0.add_stream(pkt)
1153 self.pg_enable_capture(self.pg_interfaces)
1155 capture = self.pg1.get_capture(1)
1156 self.verify_capture_out(capture, same_port=True, packet_num=1)
1157 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1159 def test_forwarding(self):
1160 """ NAT44 forwarding test """
1162 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1163 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1165 self.vapi.nat44_forwarding_enable_disable(1)
1167 real_ip = self.pg0.remote_ip4n
1168 alias_ip = self.nat_addr_n
1169 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1170 external_ip=alias_ip)
1173 # in2out - static mapping match
1175 pkts = self.create_stream_out(self.pg1)
1176 self.pg1.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1179 capture = self.pg0.get_capture(len(pkts))
1180 self.verify_capture_in(capture, self.pg0)
1182 pkts = self.create_stream_in(self.pg0, self.pg1)
1183 self.pg0.add_stream(pkts)
1184 self.pg_enable_capture(self.pg_interfaces)
1186 capture = self.pg1.get_capture(len(pkts))
1187 self.verify_capture_out(capture, same_port=True)
1189 # in2out - no static mapping match
1191 host0 = self.pg0.remote_hosts[0]
1192 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1194 pkts = self.create_stream_out(self.pg1,
1195 dst_ip=self.pg0.remote_ip4,
1196 use_inside_ports=True)
1197 self.pg1.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1200 capture = self.pg0.get_capture(len(pkts))
1201 self.verify_capture_in(capture, self.pg0)
1203 pkts = self.create_stream_in(self.pg0, self.pg1)
1204 self.pg0.add_stream(pkts)
1205 self.pg_enable_capture(self.pg_interfaces)
1207 capture = self.pg1.get_capture(len(pkts))
1208 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1211 self.pg0.remote_hosts[0] = host0
1214 self.vapi.nat44_forwarding_enable_disable(0)
1215 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1216 external_ip=alias_ip,
1219 def test_static_in(self):
1220 """ 1:1 NAT initialized from inside network """
1222 nat_ip = "10.0.0.10"
1223 self.tcp_port_out = 6303
1224 self.udp_port_out = 6304
1225 self.icmp_id_out = 6305
1227 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1228 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1229 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1233 pkts = self.create_stream_in(self.pg0, self.pg1)
1234 self.pg0.add_stream(pkts)
1235 self.pg_enable_capture(self.pg_interfaces)
1237 capture = self.pg1.get_capture(len(pkts))
1238 self.verify_capture_out(capture, nat_ip, True)
1241 pkts = self.create_stream_out(self.pg1, nat_ip)
1242 self.pg1.add_stream(pkts)
1243 self.pg_enable_capture(self.pg_interfaces)
1245 capture = self.pg0.get_capture(len(pkts))
1246 self.verify_capture_in(capture, self.pg0)
1248 def test_static_out(self):
1249 """ 1:1 NAT initialized from outside network """
1251 nat_ip = "10.0.0.20"
1252 self.tcp_port_out = 6303
1253 self.udp_port_out = 6304
1254 self.icmp_id_out = 6305
1256 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1262 pkts = self.create_stream_out(self.pg1, nat_ip)
1263 self.pg1.add_stream(pkts)
1264 self.pg_enable_capture(self.pg_interfaces)
1266 capture = self.pg0.get_capture(len(pkts))
1267 self.verify_capture_in(capture, self.pg0)
1270 pkts = self.create_stream_in(self.pg0, self.pg1)
1271 self.pg0.add_stream(pkts)
1272 self.pg_enable_capture(self.pg_interfaces)
1274 capture = self.pg1.get_capture(len(pkts))
1275 self.verify_capture_out(capture, nat_ip, True)
1277 def test_static_with_port_in(self):
1278 """ 1:1 NAPT initialized from inside network """
1280 self.tcp_port_out = 3606
1281 self.udp_port_out = 3607
1282 self.icmp_id_out = 3608
1284 self.nat44_add_address(self.nat_addr)
1285 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1286 self.tcp_port_in, self.tcp_port_out,
1287 proto=IP_PROTOS.tcp)
1288 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1289 self.udp_port_in, self.udp_port_out,
1290 proto=IP_PROTOS.udp)
1291 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1292 self.icmp_id_in, self.icmp_id_out,
1293 proto=IP_PROTOS.icmp)
1294 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1295 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1299 pkts = self.create_stream_in(self.pg0, self.pg1)
1300 self.pg0.add_stream(pkts)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(len(pkts))
1304 self.verify_capture_out(capture)
1307 pkts = self.create_stream_out(self.pg1)
1308 self.pg1.add_stream(pkts)
1309 self.pg_enable_capture(self.pg_interfaces)
1311 capture = self.pg0.get_capture(len(pkts))
1312 self.verify_capture_in(capture, self.pg0)
1314 def test_static_with_port_out(self):
1315 """ 1:1 NAPT initialized from outside network """
1317 self.tcp_port_out = 30606
1318 self.udp_port_out = 30607
1319 self.icmp_id_out = 30608
1321 self.nat44_add_address(self.nat_addr)
1322 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1323 self.tcp_port_in, self.tcp_port_out,
1324 proto=IP_PROTOS.tcp)
1325 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1326 self.udp_port_in, self.udp_port_out,
1327 proto=IP_PROTOS.udp)
1328 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1329 self.icmp_id_in, self.icmp_id_out,
1330 proto=IP_PROTOS.icmp)
1331 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1332 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1336 pkts = self.create_stream_out(self.pg1)
1337 self.pg1.add_stream(pkts)
1338 self.pg_enable_capture(self.pg_interfaces)
1340 capture = self.pg0.get_capture(len(pkts))
1341 self.verify_capture_in(capture, self.pg0)
1344 pkts = self.create_stream_in(self.pg0, self.pg1)
1345 self.pg0.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg1.get_capture(len(pkts))
1349 self.verify_capture_out(capture)
1351 def test_static_vrf_aware(self):
1352 """ 1:1 NAT VRF awareness """
1354 nat_ip1 = "10.0.0.30"
1355 nat_ip2 = "10.0.0.40"
1356 self.tcp_port_out = 6303
1357 self.udp_port_out = 6304
1358 self.icmp_id_out = 6305
1360 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1362 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1364 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1366 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1367 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1369 # inside interface VRF match NAT44 static mapping VRF
1370 pkts = self.create_stream_in(self.pg4, self.pg3)
1371 self.pg4.add_stream(pkts)
1372 self.pg_enable_capture(self.pg_interfaces)
1374 capture = self.pg3.get_capture(len(pkts))
1375 self.verify_capture_out(capture, nat_ip1, True)
1377 # inside interface VRF don't match NAT44 static mapping VRF (packets
1379 pkts = self.create_stream_in(self.pg0, self.pg3)
1380 self.pg0.add_stream(pkts)
1381 self.pg_enable_capture(self.pg_interfaces)
1383 self.pg3.assert_nothing_captured()
1385 def test_identity_nat(self):
1386 """ Identity NAT """
1388 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1389 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1390 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1393 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1394 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1395 TCP(sport=12345, dport=56789))
1396 self.pg1.add_stream(p)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg0.get_capture(1)
1404 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1405 self.assertEqual(ip.src, self.pg1.remote_ip4)
1406 self.assertEqual(tcp.dport, 56789)
1407 self.assertEqual(tcp.sport, 12345)
1408 self.check_tcp_checksum(p)
1409 self.check_ip_checksum(p)
1411 self.logger.error(ppp("Unexpected or invalid packet:", p))
1414 def test_static_lb(self):
1415 """ NAT44 local service load balancing """
1416 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1419 server1 = self.pg0.remote_hosts[0]
1420 server2 = self.pg0.remote_hosts[1]
1422 locals = [{'addr': server1.ip4n,
1425 {'addr': server2.ip4n,
1429 self.nat44_add_address(self.nat_addr)
1430 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1433 local_num=len(locals),
1435 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1436 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1439 # from client to service
1440 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1441 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1442 TCP(sport=12345, dport=external_port))
1443 self.pg1.add_stream(p)
1444 self.pg_enable_capture(self.pg_interfaces)
1446 capture = self.pg0.get_capture(1)
1452 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1453 if ip.dst == server1.ip4:
1457 self.assertEqual(tcp.dport, local_port)
1458 self.check_tcp_checksum(p)
1459 self.check_ip_checksum(p)
1461 self.logger.error(ppp("Unexpected or invalid packet:", p))
1464 # from service back to client
1465 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1466 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1467 TCP(sport=local_port, dport=12345))
1468 self.pg0.add_stream(p)
1469 self.pg_enable_capture(self.pg_interfaces)
1471 capture = self.pg1.get_capture(1)
1476 self.assertEqual(ip.src, self.nat_addr)
1477 self.assertEqual(tcp.sport, external_port)
1478 self.check_tcp_checksum(p)
1479 self.check_ip_checksum(p)
1481 self.logger.error(ppp("Unexpected or invalid packet:", p))
1487 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1489 for client in clients:
1490 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1491 IP(src=client, dst=self.nat_addr) /
1492 TCP(sport=12345, dport=external_port))
1494 self.pg1.add_stream(pkts)
1495 self.pg_enable_capture(self.pg_interfaces)
1497 capture = self.pg0.get_capture(len(pkts))
1499 if p[IP].dst == server1.ip4:
1503 self.assertTrue(server1_n > server2_n)
1505 def test_multiple_inside_interfaces(self):
1506 """ NAT44 multiple non-overlapping address space inside interfaces """
1508 self.nat44_add_address(self.nat_addr)
1509 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1511 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1514 # between two NAT44 inside interfaces (no translation)
1515 pkts = self.create_stream_in(self.pg0, self.pg1)
1516 self.pg0.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 capture = self.pg1.get_capture(len(pkts))
1520 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1522 # from NAT44 inside to interface without NAT44 feature (no translation)
1523 pkts = self.create_stream_in(self.pg0, self.pg2)
1524 self.pg0.add_stream(pkts)
1525 self.pg_enable_capture(self.pg_interfaces)
1527 capture = self.pg2.get_capture(len(pkts))
1528 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1530 # in2out 1st interface
1531 pkts = self.create_stream_in(self.pg0, self.pg3)
1532 self.pg0.add_stream(pkts)
1533 self.pg_enable_capture(self.pg_interfaces)
1535 capture = self.pg3.get_capture(len(pkts))
1536 self.verify_capture_out(capture)
1538 # out2in 1st interface
1539 pkts = self.create_stream_out(self.pg3)
1540 self.pg3.add_stream(pkts)
1541 self.pg_enable_capture(self.pg_interfaces)
1543 capture = self.pg0.get_capture(len(pkts))
1544 self.verify_capture_in(capture, self.pg0)
1546 # in2out 2nd interface
1547 pkts = self.create_stream_in(self.pg1, self.pg3)
1548 self.pg1.add_stream(pkts)
1549 self.pg_enable_capture(self.pg_interfaces)
1551 capture = self.pg3.get_capture(len(pkts))
1552 self.verify_capture_out(capture)
1554 # out2in 2nd interface
1555 pkts = self.create_stream_out(self.pg3)
1556 self.pg3.add_stream(pkts)
1557 self.pg_enable_capture(self.pg_interfaces)
1559 capture = self.pg1.get_capture(len(pkts))
1560 self.verify_capture_in(capture, self.pg1)
1562 def test_inside_overlapping_interfaces(self):
1563 """ NAT44 multiple inside interfaces with overlapping address space """
1565 static_nat_ip = "10.0.0.10"
1566 self.nat44_add_address(self.nat_addr)
1567 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1569 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1570 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1571 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1572 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1575 # between NAT44 inside interfaces with same VRF (no translation)
1576 pkts = self.create_stream_in(self.pg4, self.pg5)
1577 self.pg4.add_stream(pkts)
1578 self.pg_enable_capture(self.pg_interfaces)
1580 capture = self.pg5.get_capture(len(pkts))
1581 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1583 # between NAT44 inside interfaces with different VRF (hairpinning)
1584 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1585 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1586 TCP(sport=1234, dport=5678))
1587 self.pg4.add_stream(p)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg6.get_capture(1)
1595 self.assertEqual(ip.src, self.nat_addr)
1596 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1597 self.assertNotEqual(tcp.sport, 1234)
1598 self.assertEqual(tcp.dport, 5678)
1600 self.logger.error(ppp("Unexpected or invalid packet:", p))
1603 # in2out 1st interface
1604 pkts = self.create_stream_in(self.pg4, self.pg3)
1605 self.pg4.add_stream(pkts)
1606 self.pg_enable_capture(self.pg_interfaces)
1608 capture = self.pg3.get_capture(len(pkts))
1609 self.verify_capture_out(capture)
1611 # out2in 1st interface
1612 pkts = self.create_stream_out(self.pg3)
1613 self.pg3.add_stream(pkts)
1614 self.pg_enable_capture(self.pg_interfaces)
1616 capture = self.pg4.get_capture(len(pkts))
1617 self.verify_capture_in(capture, self.pg4)
1619 # in2out 2nd interface
1620 pkts = self.create_stream_in(self.pg5, self.pg3)
1621 self.pg5.add_stream(pkts)
1622 self.pg_enable_capture(self.pg_interfaces)
1624 capture = self.pg3.get_capture(len(pkts))
1625 self.verify_capture_out(capture)
1627 # out2in 2nd interface
1628 pkts = self.create_stream_out(self.pg3)
1629 self.pg3.add_stream(pkts)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg5.get_capture(len(pkts))
1633 self.verify_capture_in(capture, self.pg5)
1636 addresses = self.vapi.nat44_address_dump()
1637 self.assertEqual(len(addresses), 1)
1638 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1639 self.assertEqual(len(sessions), 3)
1640 for session in sessions:
1641 self.assertFalse(session.is_static)
1642 self.assertEqual(session.inside_ip_address[0:4],
1643 self.pg5.remote_ip4n)
1644 self.assertEqual(session.outside_ip_address,
1645 addresses[0].ip_address)
1646 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1647 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1648 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1649 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1650 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1651 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1652 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1653 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1654 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1656 # in2out 3rd interface
1657 pkts = self.create_stream_in(self.pg6, self.pg3)
1658 self.pg6.add_stream(pkts)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg3.get_capture(len(pkts))
1662 self.verify_capture_out(capture, static_nat_ip, True)
1664 # out2in 3rd interface
1665 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1666 self.pg3.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg6.get_capture(len(pkts))
1670 self.verify_capture_in(capture, self.pg6)
1672 # general user and session dump verifications
1673 users = self.vapi.nat44_user_dump()
1674 self.assertTrue(len(users) >= 3)
1675 addresses = self.vapi.nat44_address_dump()
1676 self.assertEqual(len(addresses), 1)
1678 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1680 for session in sessions:
1681 self.assertEqual(user.ip_address, session.inside_ip_address)
1682 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1683 self.assertTrue(session.protocol in
1684 [IP_PROTOS.tcp, IP_PROTOS.udp,
1688 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1689 self.assertTrue(len(sessions) >= 4)
1690 for session in sessions:
1691 self.assertFalse(session.is_static)
1692 self.assertEqual(session.inside_ip_address[0:4],
1693 self.pg4.remote_ip4n)
1694 self.assertEqual(session.outside_ip_address,
1695 addresses[0].ip_address)
1698 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1699 self.assertTrue(len(sessions) >= 3)
1700 for session in sessions:
1701 self.assertTrue(session.is_static)
1702 self.assertEqual(session.inside_ip_address[0:4],
1703 self.pg6.remote_ip4n)
1704 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1705 map(int, static_nat_ip.split('.')))
1706 self.assertTrue(session.inside_port in
1707 [self.tcp_port_in, self.udp_port_in,
1710 def test_hairpinning(self):
1711 """ NAT44 hairpinning - 1:1 NAPT """
1713 host = self.pg0.remote_hosts[0]
1714 server = self.pg0.remote_hosts[1]
1717 server_in_port = 5678
1718 server_out_port = 8765
1720 self.nat44_add_address(self.nat_addr)
1721 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1722 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1724 # add static mapping for server
1725 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1726 server_in_port, server_out_port,
1727 proto=IP_PROTOS.tcp)
1729 # send packet from host to server
1730 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1731 IP(src=host.ip4, dst=self.nat_addr) /
1732 TCP(sport=host_in_port, dport=server_out_port))
1733 self.pg0.add_stream(p)
1734 self.pg_enable_capture(self.pg_interfaces)
1736 capture = self.pg0.get_capture(1)
1741 self.assertEqual(ip.src, self.nat_addr)
1742 self.assertEqual(ip.dst, server.ip4)
1743 self.assertNotEqual(tcp.sport, host_in_port)
1744 self.assertEqual(tcp.dport, server_in_port)
1745 self.check_tcp_checksum(p)
1746 host_out_port = tcp.sport
1748 self.logger.error(ppp("Unexpected or invalid packet:", p))
1751 # send reply from server to host
1752 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1753 IP(src=server.ip4, dst=self.nat_addr) /
1754 TCP(sport=server_in_port, dport=host_out_port))
1755 self.pg0.add_stream(p)
1756 self.pg_enable_capture(self.pg_interfaces)
1758 capture = self.pg0.get_capture(1)
1763 self.assertEqual(ip.src, self.nat_addr)
1764 self.assertEqual(ip.dst, host.ip4)
1765 self.assertEqual(tcp.sport, server_out_port)
1766 self.assertEqual(tcp.dport, host_in_port)
1767 self.check_tcp_checksum(p)
1769 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1772 def test_hairpinning2(self):
1773 """ NAT44 hairpinning - 1:1 NAT"""
1775 server1_nat_ip = "10.0.0.10"
1776 server2_nat_ip = "10.0.0.11"
1777 host = self.pg0.remote_hosts[0]
1778 server1 = self.pg0.remote_hosts[1]
1779 server2 = self.pg0.remote_hosts[2]
1780 server_tcp_port = 22
1781 server_udp_port = 20
1783 self.nat44_add_address(self.nat_addr)
1784 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1785 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1788 # add static mapping for servers
1789 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1790 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1794 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1795 IP(src=host.ip4, dst=server1_nat_ip) /
1796 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1798 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1799 IP(src=host.ip4, dst=server1_nat_ip) /
1800 UDP(sport=self.udp_port_in, dport=server_udp_port))
1802 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1803 IP(src=host.ip4, dst=server1_nat_ip) /
1804 ICMP(id=self.icmp_id_in, type='echo-request'))
1806 self.pg0.add_stream(pkts)
1807 self.pg_enable_capture(self.pg_interfaces)
1809 capture = self.pg0.get_capture(len(pkts))
1810 for packet in capture:
1812 self.assertEqual(packet[IP].src, self.nat_addr)
1813 self.assertEqual(packet[IP].dst, server1.ip4)
1814 if packet.haslayer(TCP):
1815 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1816 self.assertEqual(packet[TCP].dport, server_tcp_port)
1817 self.tcp_port_out = packet[TCP].sport
1818 self.check_tcp_checksum(packet)
1819 elif packet.haslayer(UDP):
1820 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1821 self.assertEqual(packet[UDP].dport, server_udp_port)
1822 self.udp_port_out = packet[UDP].sport
1824 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1825 self.icmp_id_out = packet[ICMP].id
1827 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1832 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1833 IP(src=server1.ip4, dst=self.nat_addr) /
1834 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1836 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1837 IP(src=server1.ip4, dst=self.nat_addr) /
1838 UDP(sport=server_udp_port, dport=self.udp_port_out))
1840 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1841 IP(src=server1.ip4, dst=self.nat_addr) /
1842 ICMP(id=self.icmp_id_out, type='echo-reply'))
1844 self.pg0.add_stream(pkts)
1845 self.pg_enable_capture(self.pg_interfaces)
1847 capture = self.pg0.get_capture(len(pkts))
1848 for packet in capture:
1850 self.assertEqual(packet[IP].src, server1_nat_ip)
1851 self.assertEqual(packet[IP].dst, host.ip4)
1852 if packet.haslayer(TCP):
1853 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1854 self.assertEqual(packet[TCP].sport, server_tcp_port)
1855 self.check_tcp_checksum(packet)
1856 elif packet.haslayer(UDP):
1857 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1858 self.assertEqual(packet[UDP].sport, server_udp_port)
1860 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1862 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1865 # server2 to server1
1867 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1868 IP(src=server2.ip4, dst=server1_nat_ip) /
1869 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1871 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1872 IP(src=server2.ip4, dst=server1_nat_ip) /
1873 UDP(sport=self.udp_port_in, dport=server_udp_port))
1875 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1876 IP(src=server2.ip4, dst=server1_nat_ip) /
1877 ICMP(id=self.icmp_id_in, type='echo-request'))
1879 self.pg0.add_stream(pkts)
1880 self.pg_enable_capture(self.pg_interfaces)
1882 capture = self.pg0.get_capture(len(pkts))
1883 for packet in capture:
1885 self.assertEqual(packet[IP].src, server2_nat_ip)
1886 self.assertEqual(packet[IP].dst, server1.ip4)
1887 if packet.haslayer(TCP):
1888 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1889 self.assertEqual(packet[TCP].dport, server_tcp_port)
1890 self.tcp_port_out = packet[TCP].sport
1891 self.check_tcp_checksum(packet)
1892 elif packet.haslayer(UDP):
1893 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1894 self.assertEqual(packet[UDP].dport, server_udp_port)
1895 self.udp_port_out = packet[UDP].sport
1897 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1898 self.icmp_id_out = packet[ICMP].id
1900 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1903 # server1 to server2
1905 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1906 IP(src=server1.ip4, dst=server2_nat_ip) /
1907 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1910 IP(src=server1.ip4, dst=server2_nat_ip) /
1911 UDP(sport=server_udp_port, dport=self.udp_port_out))
1913 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1914 IP(src=server1.ip4, dst=server2_nat_ip) /
1915 ICMP(id=self.icmp_id_out, type='echo-reply'))
1917 self.pg0.add_stream(pkts)
1918 self.pg_enable_capture(self.pg_interfaces)
1920 capture = self.pg0.get_capture(len(pkts))
1921 for packet in capture:
1923 self.assertEqual(packet[IP].src, server1_nat_ip)
1924 self.assertEqual(packet[IP].dst, server2.ip4)
1925 if packet.haslayer(TCP):
1926 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1927 self.assertEqual(packet[TCP].sport, server_tcp_port)
1928 self.check_tcp_checksum(packet)
1929 elif packet.haslayer(UDP):
1930 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1931 self.assertEqual(packet[UDP].sport, server_udp_port)
1933 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1935 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1938 def test_max_translations_per_user(self):
1939 """ MAX translations per user - recycle the least recently used """
1941 self.nat44_add_address(self.nat_addr)
1942 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1943 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1946 # get maximum number of translations per user
1947 nat44_config = self.vapi.nat_show_config()
1949 # send more than maximum number of translations per user packets
1950 pkts_num = nat44_config.max_translations_per_user + 5
1952 for port in range(0, pkts_num):
1953 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1954 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1955 TCP(sport=1025 + port))
1957 self.pg0.add_stream(pkts)
1958 self.pg_enable_capture(self.pg_interfaces)
1961 # verify number of translated packet
1962 self.pg1.get_capture(pkts_num)
1964 def test_interface_addr(self):
1965 """ Acquire NAT44 addresses from interface """
1966 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1968 # no address in NAT pool
1969 adresses = self.vapi.nat44_address_dump()
1970 self.assertEqual(0, len(adresses))
1972 # configure interface address and check NAT address pool
1973 self.pg7.config_ip4()
1974 adresses = self.vapi.nat44_address_dump()
1975 self.assertEqual(1, len(adresses))
1976 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1978 # remove interface address and check NAT address pool
1979 self.pg7.unconfig_ip4()
1980 adresses = self.vapi.nat44_address_dump()
1981 self.assertEqual(0, len(adresses))
1983 def test_interface_addr_static_mapping(self):
1984 """ Static mapping with addresses from interface """
1985 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1986 self.nat44_add_static_mapping(
1988 external_sw_if_index=self.pg7.sw_if_index)
1990 # static mappings with external interface
1991 static_mappings = self.vapi.nat44_static_mapping_dump()
1992 self.assertEqual(1, len(static_mappings))
1993 self.assertEqual(self.pg7.sw_if_index,
1994 static_mappings[0].external_sw_if_index)
1996 # configure interface address and check static mappings
1997 self.pg7.config_ip4()
1998 static_mappings = self.vapi.nat44_static_mapping_dump()
1999 self.assertEqual(1, len(static_mappings))
2000 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2001 self.pg7.local_ip4n)
2002 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2004 # remove interface address and check static mappings
2005 self.pg7.unconfig_ip4()
2006 static_mappings = self.vapi.nat44_static_mapping_dump()
2007 self.assertEqual(0, len(static_mappings))
2009 def test_interface_addr_identity_nat(self):
2010 """ Identity NAT with addresses from interface """
2013 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2014 self.vapi.nat44_add_del_identity_mapping(
2015 sw_if_index=self.pg7.sw_if_index,
2017 protocol=IP_PROTOS.tcp,
2020 # identity mappings with external interface
2021 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2022 self.assertEqual(1, len(identity_mappings))
2023 self.assertEqual(self.pg7.sw_if_index,
2024 identity_mappings[0].sw_if_index)
2026 # configure interface address and check identity mappings
2027 self.pg7.config_ip4()
2028 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2029 self.assertEqual(1, len(identity_mappings))
2030 self.assertEqual(identity_mappings[0].ip_address,
2031 self.pg7.local_ip4n)
2032 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2033 self.assertEqual(port, identity_mappings[0].port)
2034 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2036 # remove interface address and check identity mappings
2037 self.pg7.unconfig_ip4()
2038 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2039 self.assertEqual(0, len(identity_mappings))
2041 def test_ipfix_nat44_sess(self):
2042 """ IPFIX logging NAT44 session created/delted """
2043 self.ipfix_domain_id = 10
2044 self.ipfix_src_port = 20202
2045 colector_port = 30303
2046 bind_layers(UDP, IPFIX, dport=30303)
2047 self.nat44_add_address(self.nat_addr)
2048 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2049 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2051 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2052 src_address=self.pg3.local_ip4n,
2054 template_interval=10,
2055 collector_port=colector_port)
2056 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2057 src_port=self.ipfix_src_port)
2059 pkts = self.create_stream_in(self.pg0, self.pg1)
2060 self.pg0.add_stream(pkts)
2061 self.pg_enable_capture(self.pg_interfaces)
2063 capture = self.pg1.get_capture(len(pkts))
2064 self.verify_capture_out(capture)
2065 self.nat44_add_address(self.nat_addr, is_add=0)
2066 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2067 capture = self.pg3.get_capture(3)
2068 ipfix = IPFIXDecoder()
2069 # first load template
2071 self.assertTrue(p.haslayer(IPFIX))
2072 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2073 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2074 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2075 self.assertEqual(p[UDP].dport, colector_port)
2076 self.assertEqual(p[IPFIX].observationDomainID,
2077 self.ipfix_domain_id)
2078 if p.haslayer(Template):
2079 ipfix.add_template(p.getlayer(Template))
2080 # verify events in data set
2082 if p.haslayer(Data):
2083 data = ipfix.decode_data_set(p.getlayer(Set))
2084 self.verify_ipfix_nat44_ses(data)
2086 def test_ipfix_addr_exhausted(self):
2087 """ IPFIX logging NAT addresses exhausted """
2088 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2089 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2091 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2092 src_address=self.pg3.local_ip4n,
2094 template_interval=10)
2095 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2096 src_port=self.ipfix_src_port)
2098 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2099 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2101 self.pg0.add_stream(p)
2102 self.pg_enable_capture(self.pg_interfaces)
2104 capture = self.pg1.get_capture(0)
2105 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2106 capture = self.pg3.get_capture(3)
2107 ipfix = IPFIXDecoder()
2108 # first load template
2110 self.assertTrue(p.haslayer(IPFIX))
2111 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2112 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2113 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2114 self.assertEqual(p[UDP].dport, 4739)
2115 self.assertEqual(p[IPFIX].observationDomainID,
2116 self.ipfix_domain_id)
2117 if p.haslayer(Template):
2118 ipfix.add_template(p.getlayer(Template))
2119 # verify events in data set
2121 if p.haslayer(Data):
2122 data = ipfix.decode_data_set(p.getlayer(Set))
2123 self.verify_ipfix_addr_exhausted(data)
2125 def test_pool_addr_fib(self):
2126 """ NAT44 add pool addresses to FIB """
2127 static_addr = '10.0.0.10'
2128 self.nat44_add_address(self.nat_addr)
2129 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2130 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2132 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2135 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2136 ARP(op=ARP.who_has, pdst=self.nat_addr,
2137 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2138 self.pg1.add_stream(p)
2139 self.pg_enable_capture(self.pg_interfaces)
2141 capture = self.pg1.get_capture(1)
2142 self.assertTrue(capture[0].haslayer(ARP))
2143 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2146 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2147 ARP(op=ARP.who_has, pdst=static_addr,
2148 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2149 self.pg1.add_stream(p)
2150 self.pg_enable_capture(self.pg_interfaces)
2152 capture = self.pg1.get_capture(1)
2153 self.assertTrue(capture[0].haslayer(ARP))
2154 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2156 # send ARP to non-NAT44 interface
2157 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2158 ARP(op=ARP.who_has, pdst=self.nat_addr,
2159 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2160 self.pg2.add_stream(p)
2161 self.pg_enable_capture(self.pg_interfaces)
2163 capture = self.pg1.get_capture(0)
2165 # remove addresses and verify
2166 self.nat44_add_address(self.nat_addr, is_add=0)
2167 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2170 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2171 ARP(op=ARP.who_has, pdst=self.nat_addr,
2172 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2173 self.pg1.add_stream(p)
2174 self.pg_enable_capture(self.pg_interfaces)
2176 capture = self.pg1.get_capture(0)
2178 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2179 ARP(op=ARP.who_has, pdst=static_addr,
2180 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2181 self.pg1.add_stream(p)
2182 self.pg_enable_capture(self.pg_interfaces)
2184 capture = self.pg1.get_capture(0)
2186 def test_vrf_mode(self):
2187 """ NAT44 tenant VRF aware address pool mode """
2191 nat_ip1 = "10.0.0.10"
2192 nat_ip2 = "10.0.0.11"
2194 self.pg0.unconfig_ip4()
2195 self.pg1.unconfig_ip4()
2196 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2197 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2198 self.pg0.set_table_ip4(vrf_id1)
2199 self.pg1.set_table_ip4(vrf_id2)
2200 self.pg0.config_ip4()
2201 self.pg1.config_ip4()
2203 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2204 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2205 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2206 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2207 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2211 pkts = self.create_stream_in(self.pg0, self.pg2)
2212 self.pg0.add_stream(pkts)
2213 self.pg_enable_capture(self.pg_interfaces)
2215 capture = self.pg2.get_capture(len(pkts))
2216 self.verify_capture_out(capture, nat_ip1)
2219 pkts = self.create_stream_in(self.pg1, self.pg2)
2220 self.pg1.add_stream(pkts)
2221 self.pg_enable_capture(self.pg_interfaces)
2223 capture = self.pg2.get_capture(len(pkts))
2224 self.verify_capture_out(capture, nat_ip2)
2226 self.pg0.unconfig_ip4()
2227 self.pg1.unconfig_ip4()
2228 self.pg0.set_table_ip4(0)
2229 self.pg1.set_table_ip4(0)
2230 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2231 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2233 def test_vrf_feature_independent(self):
2234 """ NAT44 tenant VRF independent address pool mode """
2236 nat_ip1 = "10.0.0.10"
2237 nat_ip2 = "10.0.0.11"
2239 self.nat44_add_address(nat_ip1)
2240 self.nat44_add_address(nat_ip2, vrf_id=99)
2241 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2242 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2243 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2247 pkts = self.create_stream_in(self.pg0, self.pg2)
2248 self.pg0.add_stream(pkts)
2249 self.pg_enable_capture(self.pg_interfaces)
2251 capture = self.pg2.get_capture(len(pkts))
2252 self.verify_capture_out(capture, nat_ip1)
2255 pkts = self.create_stream_in(self.pg1, self.pg2)
2256 self.pg1.add_stream(pkts)
2257 self.pg_enable_capture(self.pg_interfaces)
2259 capture = self.pg2.get_capture(len(pkts))
2260 self.verify_capture_out(capture, nat_ip1)
2262 def test_dynamic_ipless_interfaces(self):
2263 """ NAT44 interfaces without configured IP address """
2265 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2266 mactobinary(self.pg7.remote_mac),
2267 self.pg7.remote_ip4n,
2269 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2270 mactobinary(self.pg8.remote_mac),
2271 self.pg8.remote_ip4n,
2274 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2275 dst_address_length=32,
2276 next_hop_address=self.pg7.remote_ip4n,
2277 next_hop_sw_if_index=self.pg7.sw_if_index)
2278 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2279 dst_address_length=32,
2280 next_hop_address=self.pg8.remote_ip4n,
2281 next_hop_sw_if_index=self.pg8.sw_if_index)
2283 self.nat44_add_address(self.nat_addr)
2284 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2285 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2289 pkts = self.create_stream_in(self.pg7, self.pg8)
2290 self.pg7.add_stream(pkts)
2291 self.pg_enable_capture(self.pg_interfaces)
2293 capture = self.pg8.get_capture(len(pkts))
2294 self.verify_capture_out(capture)
2297 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2298 self.pg8.add_stream(pkts)
2299 self.pg_enable_capture(self.pg_interfaces)
2301 capture = self.pg7.get_capture(len(pkts))
2302 self.verify_capture_in(capture, self.pg7)
2304 def test_static_ipless_interfaces(self):
2305 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2307 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2308 mactobinary(self.pg7.remote_mac),
2309 self.pg7.remote_ip4n,
2311 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2312 mactobinary(self.pg8.remote_mac),
2313 self.pg8.remote_ip4n,
2316 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2317 dst_address_length=32,
2318 next_hop_address=self.pg7.remote_ip4n,
2319 next_hop_sw_if_index=self.pg7.sw_if_index)
2320 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2321 dst_address_length=32,
2322 next_hop_address=self.pg8.remote_ip4n,
2323 next_hop_sw_if_index=self.pg8.sw_if_index)
2325 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2326 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2327 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2331 pkts = self.create_stream_out(self.pg8)
2332 self.pg8.add_stream(pkts)
2333 self.pg_enable_capture(self.pg_interfaces)
2335 capture = self.pg7.get_capture(len(pkts))
2336 self.verify_capture_in(capture, self.pg7)
2339 pkts = self.create_stream_in(self.pg7, self.pg8)
2340 self.pg7.add_stream(pkts)
2341 self.pg_enable_capture(self.pg_interfaces)
2343 capture = self.pg8.get_capture(len(pkts))
2344 self.verify_capture_out(capture, self.nat_addr, True)
2346 def test_static_with_port_ipless_interfaces(self):
2347 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2349 self.tcp_port_out = 30606
2350 self.udp_port_out = 30607
2351 self.icmp_id_out = 30608
2353 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2354 mactobinary(self.pg7.remote_mac),
2355 self.pg7.remote_ip4n,
2357 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2358 mactobinary(self.pg8.remote_mac),
2359 self.pg8.remote_ip4n,
2362 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2363 dst_address_length=32,
2364 next_hop_address=self.pg7.remote_ip4n,
2365 next_hop_sw_if_index=self.pg7.sw_if_index)
2366 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2367 dst_address_length=32,
2368 next_hop_address=self.pg8.remote_ip4n,
2369 next_hop_sw_if_index=self.pg8.sw_if_index)
2371 self.nat44_add_address(self.nat_addr)
2372 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2373 self.tcp_port_in, self.tcp_port_out,
2374 proto=IP_PROTOS.tcp)
2375 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2376 self.udp_port_in, self.udp_port_out,
2377 proto=IP_PROTOS.udp)
2378 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2379 self.icmp_id_in, self.icmp_id_out,
2380 proto=IP_PROTOS.icmp)
2381 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2382 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2386 pkts = self.create_stream_out(self.pg8)
2387 self.pg8.add_stream(pkts)
2388 self.pg_enable_capture(self.pg_interfaces)
2390 capture = self.pg7.get_capture(len(pkts))
2391 self.verify_capture_in(capture, self.pg7)
2394 pkts = self.create_stream_in(self.pg7, self.pg8)
2395 self.pg7.add_stream(pkts)
2396 self.pg_enable_capture(self.pg_interfaces)
2398 capture = self.pg8.get_capture(len(pkts))
2399 self.verify_capture_out(capture)
2401 def test_static_unknown_proto(self):
2402 """ 1:1 NAT translate packet with unknown protocol """
2403 nat_ip = "10.0.0.10"
2404 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2410 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2411 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2413 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2414 TCP(sport=1234, dport=1234))
2415 self.pg0.add_stream(p)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 p = self.pg1.get_capture(1)
2421 self.assertEqual(packet[IP].src, nat_ip)
2422 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2423 self.assertTrue(packet.haslayer(GRE))
2424 self.check_ip_checksum(packet)
2426 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2430 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2431 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2433 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2434 TCP(sport=1234, dport=1234))
2435 self.pg1.add_stream(p)
2436 self.pg_enable_capture(self.pg_interfaces)
2438 p = self.pg0.get_capture(1)
2441 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2442 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2443 self.assertTrue(packet.haslayer(GRE))
2444 self.check_ip_checksum(packet)
2446 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2449 def test_hairpinning_static_unknown_proto(self):
2450 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2452 host = self.pg0.remote_hosts[0]
2453 server = self.pg0.remote_hosts[1]
2455 host_nat_ip = "10.0.0.10"
2456 server_nat_ip = "10.0.0.11"
2458 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2459 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2465 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2466 IP(src=host.ip4, dst=server_nat_ip) /
2468 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2469 TCP(sport=1234, dport=1234))
2470 self.pg0.add_stream(p)
2471 self.pg_enable_capture(self.pg_interfaces)
2473 p = self.pg0.get_capture(1)
2476 self.assertEqual(packet[IP].src, host_nat_ip)
2477 self.assertEqual(packet[IP].dst, server.ip4)
2478 self.assertTrue(packet.haslayer(GRE))
2479 self.check_ip_checksum(packet)
2481 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2485 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2486 IP(src=server.ip4, dst=host_nat_ip) /
2488 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2489 TCP(sport=1234, dport=1234))
2490 self.pg0.add_stream(p)
2491 self.pg_enable_capture(self.pg_interfaces)
2493 p = self.pg0.get_capture(1)
2496 self.assertEqual(packet[IP].src, server_nat_ip)
2497 self.assertEqual(packet[IP].dst, host.ip4)
2498 self.assertTrue(packet.haslayer(GRE))
2499 self.check_ip_checksum(packet)
2501 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2504 def test_unknown_proto(self):
2505 """ NAT44 translate packet with unknown protocol """
2506 self.nat44_add_address(self.nat_addr)
2507 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2508 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2512 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2513 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2514 TCP(sport=self.tcp_port_in, dport=20))
2515 self.pg0.add_stream(p)
2516 self.pg_enable_capture(self.pg_interfaces)
2518 p = self.pg1.get_capture(1)
2520 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2521 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2523 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2524 TCP(sport=1234, dport=1234))
2525 self.pg0.add_stream(p)
2526 self.pg_enable_capture(self.pg_interfaces)
2528 p = self.pg1.get_capture(1)
2531 self.assertEqual(packet[IP].src, self.nat_addr)
2532 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2533 self.assertTrue(packet.haslayer(GRE))
2534 self.check_ip_checksum(packet)
2536 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2540 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2541 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2543 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2544 TCP(sport=1234, dport=1234))
2545 self.pg1.add_stream(p)
2546 self.pg_enable_capture(self.pg_interfaces)
2548 p = self.pg0.get_capture(1)
2551 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2552 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2553 self.assertTrue(packet.haslayer(GRE))
2554 self.check_ip_checksum(packet)
2556 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2559 def test_hairpinning_unknown_proto(self):
2560 """ NAT44 translate packet with unknown protocol - hairpinning """
2561 host = self.pg0.remote_hosts[0]
2562 server = self.pg0.remote_hosts[1]
2565 server_in_port = 5678
2566 server_out_port = 8765
2567 server_nat_ip = "10.0.0.11"
2569 self.nat44_add_address(self.nat_addr)
2570 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2571 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2574 # add static mapping for server
2575 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2578 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2579 IP(src=host.ip4, dst=server_nat_ip) /
2580 TCP(sport=host_in_port, dport=server_out_port))
2581 self.pg0.add_stream(p)
2582 self.pg_enable_capture(self.pg_interfaces)
2584 capture = self.pg0.get_capture(1)
2586 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2587 IP(src=host.ip4, dst=server_nat_ip) /
2589 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2590 TCP(sport=1234, dport=1234))
2591 self.pg0.add_stream(p)
2592 self.pg_enable_capture(self.pg_interfaces)
2594 p = self.pg0.get_capture(1)
2597 self.assertEqual(packet[IP].src, self.nat_addr)
2598 self.assertEqual(packet[IP].dst, server.ip4)
2599 self.assertTrue(packet.haslayer(GRE))
2600 self.check_ip_checksum(packet)
2602 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2606 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2607 IP(src=server.ip4, dst=self.nat_addr) /
2609 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2610 TCP(sport=1234, dport=1234))
2611 self.pg0.add_stream(p)
2612 self.pg_enable_capture(self.pg_interfaces)
2614 p = self.pg0.get_capture(1)
2617 self.assertEqual(packet[IP].src, server_nat_ip)
2618 self.assertEqual(packet[IP].dst, host.ip4)
2619 self.assertTrue(packet.haslayer(GRE))
2620 self.check_ip_checksum(packet)
2622 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2625 def test_output_feature(self):
2626 """ NAT44 interface output feature (in2out postrouting) """
2627 self.nat44_add_address(self.nat_addr)
2628 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2629 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2630 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2634 pkts = self.create_stream_in(self.pg0, self.pg3)
2635 self.pg0.add_stream(pkts)
2636 self.pg_enable_capture(self.pg_interfaces)
2638 capture = self.pg3.get_capture(len(pkts))
2639 self.verify_capture_out(capture)
2642 pkts = self.create_stream_out(self.pg3)
2643 self.pg3.add_stream(pkts)
2644 self.pg_enable_capture(self.pg_interfaces)
2646 capture = self.pg0.get_capture(len(pkts))
2647 self.verify_capture_in(capture, self.pg0)
2649 # from non-NAT interface to NAT inside interface
2650 pkts = self.create_stream_in(self.pg2, self.pg0)
2651 self.pg2.add_stream(pkts)
2652 self.pg_enable_capture(self.pg_interfaces)
2654 capture = self.pg0.get_capture(len(pkts))
2655 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2657 def test_output_feature_vrf_aware(self):
2658 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2659 nat_ip_vrf10 = "10.0.0.10"
2660 nat_ip_vrf20 = "10.0.0.20"
2662 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2663 dst_address_length=32,
2664 next_hop_address=self.pg3.remote_ip4n,
2665 next_hop_sw_if_index=self.pg3.sw_if_index,
2667 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2668 dst_address_length=32,
2669 next_hop_address=self.pg3.remote_ip4n,
2670 next_hop_sw_if_index=self.pg3.sw_if_index,
2673 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2674 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2675 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2676 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2677 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2681 pkts = self.create_stream_in(self.pg4, self.pg3)
2682 self.pg4.add_stream(pkts)
2683 self.pg_enable_capture(self.pg_interfaces)
2685 capture = self.pg3.get_capture(len(pkts))
2686 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2689 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2690 self.pg3.add_stream(pkts)
2691 self.pg_enable_capture(self.pg_interfaces)
2693 capture = self.pg4.get_capture(len(pkts))
2694 self.verify_capture_in(capture, self.pg4)
2697 pkts = self.create_stream_in(self.pg6, self.pg3)
2698 self.pg6.add_stream(pkts)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg3.get_capture(len(pkts))
2702 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2705 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2706 self.pg3.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg6.get_capture(len(pkts))
2710 self.verify_capture_in(capture, self.pg6)
2712 def test_output_feature_hairpinning(self):
2713 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2714 host = self.pg0.remote_hosts[0]
2715 server = self.pg0.remote_hosts[1]
2718 server_in_port = 5678
2719 server_out_port = 8765
2721 self.nat44_add_address(self.nat_addr)
2722 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2723 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2726 # add static mapping for server
2727 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2728 server_in_port, server_out_port,
2729 proto=IP_PROTOS.tcp)
2731 # send packet from host to server
2732 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2733 IP(src=host.ip4, dst=self.nat_addr) /
2734 TCP(sport=host_in_port, dport=server_out_port))
2735 self.pg0.add_stream(p)
2736 self.pg_enable_capture(self.pg_interfaces)
2738 capture = self.pg0.get_capture(1)
2743 self.assertEqual(ip.src, self.nat_addr)
2744 self.assertEqual(ip.dst, server.ip4)
2745 self.assertNotEqual(tcp.sport, host_in_port)
2746 self.assertEqual(tcp.dport, server_in_port)
2747 self.check_tcp_checksum(p)
2748 host_out_port = tcp.sport
2750 self.logger.error(ppp("Unexpected or invalid packet:", p))
2753 # send reply from server to host
2754 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2755 IP(src=server.ip4, dst=self.nat_addr) /
2756 TCP(sport=server_in_port, dport=host_out_port))
2757 self.pg0.add_stream(p)
2758 self.pg_enable_capture(self.pg_interfaces)
2760 capture = self.pg0.get_capture(1)
2765 self.assertEqual(ip.src, self.nat_addr)
2766 self.assertEqual(ip.dst, host.ip4)
2767 self.assertEqual(tcp.sport, server_out_port)
2768 self.assertEqual(tcp.dport, host_in_port)
2769 self.check_tcp_checksum(p)
2771 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2774 def test_one_armed_nat44(self):
2775 """ One armed NAT44 """
2776 remote_host = self.pg9.remote_hosts[0]
2777 local_host = self.pg9.remote_hosts[1]
2780 self.nat44_add_address(self.nat_addr)
2781 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2782 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2786 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2787 IP(src=local_host.ip4, dst=remote_host.ip4) /
2788 TCP(sport=12345, dport=80))
2789 self.pg9.add_stream(p)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg9.get_capture(1)
2797 self.assertEqual(ip.src, self.nat_addr)
2798 self.assertEqual(ip.dst, remote_host.ip4)
2799 self.assertNotEqual(tcp.sport, 12345)
2800 external_port = tcp.sport
2801 self.assertEqual(tcp.dport, 80)
2802 self.check_tcp_checksum(p)
2803 self.check_ip_checksum(p)
2805 self.logger.error(ppp("Unexpected or invalid packet:", p))
2809 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2810 IP(src=remote_host.ip4, dst=self.nat_addr) /
2811 TCP(sport=80, dport=external_port))
2812 self.pg9.add_stream(p)
2813 self.pg_enable_capture(self.pg_interfaces)
2815 capture = self.pg9.get_capture(1)
2820 self.assertEqual(ip.src, remote_host.ip4)
2821 self.assertEqual(ip.dst, local_host.ip4)
2822 self.assertEqual(tcp.sport, 80)
2823 self.assertEqual(tcp.dport, 12345)
2824 self.check_tcp_checksum(p)
2825 self.check_ip_checksum(p)
2827 self.logger.error(ppp("Unexpected or invalid packet:", p))
2830 def test_del_session(self):
2831 """ Delete NAT44 session """
2832 self.nat44_add_address(self.nat_addr)
2833 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2834 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2837 pkts = self.create_stream_in(self.pg0, self.pg1)
2838 self.pg0.add_stream(pkts)
2839 self.pg_enable_capture(self.pg_interfaces)
2841 capture = self.pg1.get_capture(len(pkts))
2843 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2844 nsessions = len(sessions)
2846 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2847 sessions[0].inside_port,
2848 sessions[0].protocol)
2849 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2850 sessions[1].outside_port,
2851 sessions[1].protocol,
2854 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2855 self.assertEqual(nsessions - len(sessions), 2)
2857 def test_set_get_reass(self):
2858 """ NAT44 set/get virtual fragmentation reassembly """
2859 reas_cfg1 = self.vapi.nat_get_reass()
2861 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2862 max_reass=reas_cfg1.ip4_max_reass * 2,
2863 max_frag=reas_cfg1.ip4_max_frag * 2)
2865 reas_cfg2 = self.vapi.nat_get_reass()
2867 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2868 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2869 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2871 self.vapi.nat_set_reass(drop_frag=1)
2872 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2874 def test_frag_in_order(self):
2875 """ NAT44 translate fragments arriving in order """
2876 self.nat44_add_address(self.nat_addr)
2877 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2878 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2881 data = "A" * 4 + "B" * 16 + "C" * 3
2882 self.tcp_port_in = random.randint(1025, 65535)
2884 reass = self.vapi.nat_reass_dump()
2885 reass_n_start = len(reass)
2888 pkts = self.create_stream_frag(self.pg0,
2889 self.pg1.remote_ip4,
2893 self.pg0.add_stream(pkts)
2894 self.pg_enable_capture(self.pg_interfaces)
2896 frags = self.pg1.get_capture(len(pkts))
2897 p = self.reass_frags_and_verify(frags,
2899 self.pg1.remote_ip4)
2900 self.assertEqual(p[TCP].dport, 20)
2901 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2902 self.tcp_port_out = p[TCP].sport
2903 self.assertEqual(data, p[Raw].load)
2906 pkts = self.create_stream_frag(self.pg1,
2911 self.pg1.add_stream(pkts)
2912 self.pg_enable_capture(self.pg_interfaces)
2914 frags = self.pg0.get_capture(len(pkts))
2915 p = self.reass_frags_and_verify(frags,
2916 self.pg1.remote_ip4,
2917 self.pg0.remote_ip4)
2918 self.assertEqual(p[TCP].sport, 20)
2919 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2920 self.assertEqual(data, p[Raw].load)
2922 reass = self.vapi.nat_reass_dump()
2923 reass_n_end = len(reass)
2925 self.assertEqual(reass_n_end - reass_n_start, 2)
2927 def test_reass_hairpinning(self):
2928 """ NAT44 fragments hairpinning """
2929 host = self.pg0.remote_hosts[0]
2930 server = self.pg0.remote_hosts[1]
2931 host_in_port = random.randint(1025, 65535)
2933 server_in_port = random.randint(1025, 65535)
2934 server_out_port = random.randint(1025, 65535)
2935 data = "A" * 4 + "B" * 16 + "C" * 3
2937 self.nat44_add_address(self.nat_addr)
2938 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2939 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2941 # add static mapping for server
2942 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2943 server_in_port, server_out_port,
2944 proto=IP_PROTOS.tcp)
2946 # send packet from host to server
2947 pkts = self.create_stream_frag(self.pg0,
2952 self.pg0.add_stream(pkts)
2953 self.pg_enable_capture(self.pg_interfaces)
2955 frags = self.pg0.get_capture(len(pkts))
2956 p = self.reass_frags_and_verify(frags,
2959 self.assertNotEqual(p[TCP].sport, host_in_port)
2960 self.assertEqual(p[TCP].dport, server_in_port)
2961 self.assertEqual(data, p[Raw].load)
2963 def test_frag_out_of_order(self):
2964 """ NAT44 translate fragments arriving out of order """
2965 self.nat44_add_address(self.nat_addr)
2966 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2967 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2970 data = "A" * 4 + "B" * 16 + "C" * 3
2971 random.randint(1025, 65535)
2974 pkts = self.create_stream_frag(self.pg0,
2975 self.pg1.remote_ip4,
2980 self.pg0.add_stream(pkts)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 frags = self.pg1.get_capture(len(pkts))
2984 p = self.reass_frags_and_verify(frags,
2986 self.pg1.remote_ip4)
2987 self.assertEqual(p[TCP].dport, 20)
2988 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2989 self.tcp_port_out = p[TCP].sport
2990 self.assertEqual(data, p[Raw].load)
2993 pkts = self.create_stream_frag(self.pg1,
2999 self.pg1.add_stream(pkts)
3000 self.pg_enable_capture(self.pg_interfaces)
3002 frags = self.pg0.get_capture(len(pkts))
3003 p = self.reass_frags_and_verify(frags,
3004 self.pg1.remote_ip4,
3005 self.pg0.remote_ip4)
3006 self.assertEqual(p[TCP].sport, 20)
3007 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3008 self.assertEqual(data, p[Raw].load)
3010 def test_port_restricted(self):
3011 """ Port restricted NAT44 (MAP-E CE) """
3012 self.nat44_add_address(self.nat_addr)
3013 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3014 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3016 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3017 "psid-offset 6 psid-len 6")
3019 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3021 TCP(sport=4567, dport=22))
3022 self.pg0.add_stream(p)
3023 self.pg_enable_capture(self.pg_interfaces)
3025 capture = self.pg1.get_capture(1)
3030 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3031 self.assertEqual(ip.src, self.nat_addr)
3032 self.assertEqual(tcp.dport, 22)
3033 self.assertNotEqual(tcp.sport, 4567)
3034 self.assertEqual((tcp.sport >> 6) & 63, 10)
3035 self.check_tcp_checksum(p)
3036 self.check_ip_checksum(p)
3038 self.logger.error(ppp("Unexpected or invalid packet:", p))
3041 def test_twice_nat(self):
3043 twice_nat_addr = '10.0.1.3'
3048 self.nat44_add_address(self.nat_addr)
3049 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3050 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3051 port_in, port_out, proto=IP_PROTOS.tcp,
3053 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3054 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3057 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3058 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3059 TCP(sport=eh_port_out, dport=port_out))
3060 self.pg1.add_stream(p)
3061 self.pg_enable_capture(self.pg_interfaces)
3063 capture = self.pg0.get_capture(1)
3068 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3069 self.assertEqual(ip.src, twice_nat_addr)
3070 self.assertEqual(tcp.dport, port_in)
3071 self.assertNotEqual(tcp.sport, eh_port_out)
3072 eh_port_in = tcp.sport
3073 self.check_tcp_checksum(p)
3074 self.check_ip_checksum(p)
3076 self.logger.error(ppp("Unexpected or invalid packet:", p))
3079 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3080 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3081 TCP(sport=port_in, dport=eh_port_in))
3082 self.pg0.add_stream(p)
3083 self.pg_enable_capture(self.pg_interfaces)
3085 capture = self.pg1.get_capture(1)
3090 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3091 self.assertEqual(ip.src, self.nat_addr)
3092 self.assertEqual(tcp.dport, eh_port_out)
3093 self.assertEqual(tcp.sport, port_out)
3094 self.check_tcp_checksum(p)
3095 self.check_ip_checksum(p)
3097 self.logger.error(ppp("Unexpected or invalid packet:", p))
3100 def test_twice_nat_lb(self):
3101 """ Twice NAT44 local service load balancing """
3102 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3103 twice_nat_addr = '10.0.1.3'
3108 server1 = self.pg0.remote_hosts[0]
3109 server2 = self.pg0.remote_hosts[1]
3111 locals = [{'addr': server1.ip4n,
3114 {'addr': server2.ip4n,
3118 self.nat44_add_address(self.nat_addr)
3119 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3121 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3125 local_num=len(locals),
3127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3131 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3132 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3133 TCP(sport=eh_port_out, dport=external_port))
3134 self.pg1.add_stream(p)
3135 self.pg_enable_capture(self.pg_interfaces)
3137 capture = self.pg0.get_capture(1)
3143 self.assertEqual(ip.src, twice_nat_addr)
3144 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3145 if ip.dst == server1.ip4:
3149 self.assertNotEqual(tcp.sport, eh_port_out)
3150 eh_port_in = tcp.sport
3151 self.assertEqual(tcp.dport, local_port)
3152 self.check_tcp_checksum(p)
3153 self.check_ip_checksum(p)
3155 self.logger.error(ppp("Unexpected or invalid packet:", p))
3158 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3159 IP(src=server.ip4, dst=twice_nat_addr) /
3160 TCP(sport=local_port, dport=eh_port_in))
3161 self.pg0.add_stream(p)
3162 self.pg_enable_capture(self.pg_interfaces)
3164 capture = self.pg1.get_capture(1)
3169 self.assertEqual(ip.src, self.nat_addr)
3170 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3171 self.assertEqual(tcp.sport, external_port)
3172 self.assertEqual(tcp.dport, eh_port_out)
3173 self.check_tcp_checksum(p)
3174 self.check_ip_checksum(p)
3176 self.logger.error(ppp("Unexpected or invalid packet:", p))
3179 def test_twice_nat_interface_addr(self):
3180 """ Acquire twice NAT44 addresses from interface """
3181 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3183 # no address in NAT pool
3184 adresses = self.vapi.nat44_address_dump()
3185 self.assertEqual(0, len(adresses))
3187 # configure interface address and check NAT address pool
3188 self.pg7.config_ip4()
3189 adresses = self.vapi.nat44_address_dump()
3190 self.assertEqual(1, len(adresses))
3191 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3192 self.assertEqual(adresses[0].twice_nat, 1)
3194 # remove interface address and check NAT address pool
3195 self.pg7.unconfig_ip4()
3196 adresses = self.vapi.nat44_address_dump()
3197 self.assertEqual(0, len(adresses))
3200 super(TestNAT44, self).tearDown()
3201 if not self.vpp_dead:
3202 self.logger.info(self.vapi.cli("show nat44 verbose"))
3203 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3204 self.vapi.cli("nat addr-port-assignment-alg default")
3208 class TestNAT44Out2InDPO(MethodHolder):
3209 """ NAT44 Test Cases using out2in DPO """
3212 def setUpConstants(cls):
3213 super(TestNAT44Out2InDPO, cls).setUpConstants()
3214 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3217 def setUpClass(cls):
3218 super(TestNAT44Out2InDPO, cls).setUpClass()
3221 cls.tcp_port_in = 6303
3222 cls.tcp_port_out = 6303
3223 cls.udp_port_in = 6304
3224 cls.udp_port_out = 6304
3225 cls.icmp_id_in = 6305
3226 cls.icmp_id_out = 6305
3227 cls.nat_addr = '10.0.0.3'
3228 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3229 cls.dst_ip4 = '192.168.70.1'
3231 cls.create_pg_interfaces(range(2))
3234 cls.pg0.config_ip4()
3235 cls.pg0.resolve_arp()
3238 cls.pg1.config_ip6()
3239 cls.pg1.resolve_ndp()
3241 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3242 dst_address_length=0,
3243 next_hop_address=cls.pg1.remote_ip6n,
3244 next_hop_sw_if_index=cls.pg1.sw_if_index)
3247 super(TestNAT44Out2InDPO, cls).tearDownClass()
3250 def configure_xlat(self):
3251 self.dst_ip6_pfx = '1:2:3::'
3252 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3254 self.dst_ip6_pfx_len = 96
3255 self.src_ip6_pfx = '4:5:6::'
3256 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3258 self.src_ip6_pfx_len = 96
3259 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3260 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3261 '\x00\x00\x00\x00', 0, is_translation=1,
3264 def test_464xlat_ce(self):
3265 """ Test 464XLAT CE with NAT44 """
3267 self.configure_xlat()
3269 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3270 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3272 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3273 self.dst_ip6_pfx_len)
3274 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3275 self.src_ip6_pfx_len)
3278 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3279 self.pg0.add_stream(pkts)
3280 self.pg_enable_capture(self.pg_interfaces)
3282 capture = self.pg1.get_capture(len(pkts))
3283 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3286 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3288 self.pg1.add_stream(pkts)
3289 self.pg_enable_capture(self.pg_interfaces)
3291 capture = self.pg0.get_capture(len(pkts))
3292 self.verify_capture_in(capture, self.pg0)
3294 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3296 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3297 self.nat_addr_n, is_add=0)
3299 def test_464xlat_ce_no_nat(self):
3300 """ Test 464XLAT CE without NAT44 """
3302 self.configure_xlat()
3304 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3305 self.dst_ip6_pfx_len)
3306 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3307 self.src_ip6_pfx_len)
3309 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3310 self.pg0.add_stream(pkts)
3311 self.pg_enable_capture(self.pg_interfaces)
3313 capture = self.pg1.get_capture(len(pkts))
3314 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3315 nat_ip=out_dst_ip6, same_port=True)
3317 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3318 self.pg1.add_stream(pkts)
3319 self.pg_enable_capture(self.pg_interfaces)
3321 capture = self.pg0.get_capture(len(pkts))
3322 self.verify_capture_in(capture, self.pg0)
3325 class TestDeterministicNAT(MethodHolder):
3326 """ Deterministic NAT Test Cases """
3329 def setUpConstants(cls):
3330 super(TestDeterministicNAT, cls).setUpConstants()
3331 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3334 def setUpClass(cls):
3335 super(TestDeterministicNAT, cls).setUpClass()
3338 cls.tcp_port_in = 6303
3339 cls.tcp_external_port = 6303
3340 cls.udp_port_in = 6304
3341 cls.udp_external_port = 6304
3342 cls.icmp_id_in = 6305
3343 cls.nat_addr = '10.0.0.3'
3345 cls.create_pg_interfaces(range(3))
3346 cls.interfaces = list(cls.pg_interfaces)
3348 for i in cls.interfaces:
3353 cls.pg0.generate_remote_hosts(2)
3354 cls.pg0.configure_ipv4_neighbors()
3357 super(TestDeterministicNAT, cls).tearDownClass()
3360 def create_stream_in(self, in_if, out_if, ttl=64):
3362 Create packet stream for inside network
3364 :param in_if: Inside interface
3365 :param out_if: Outside interface
3366 :param ttl: TTL of generated packets
3370 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3371 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3372 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3376 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3377 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3378 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3382 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3383 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3384 ICMP(id=self.icmp_id_in, type='echo-request'))
3389 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3391 Create packet stream for outside network
3393 :param out_if: Outside interface
3394 :param dst_ip: Destination IP address (Default use global NAT address)
3395 :param ttl: TTL of generated packets
3398 dst_ip = self.nat_addr
3401 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3402 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3403 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3407 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3408 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3409 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3413 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3414 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3415 ICMP(id=self.icmp_external_id, type='echo-reply'))
3420 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3422 Verify captured packets on outside network
3424 :param capture: Captured packets
3425 :param nat_ip: Translated IP address (Default use global NAT address)
3426 :param same_port: Sorce port number is not translated (Default False)
3427 :param packet_num: Expected number of packets (Default 3)
3430 nat_ip = self.nat_addr
3431 self.assertEqual(packet_num, len(capture))
3432 for packet in capture:
3434 self.assertEqual(packet[IP].src, nat_ip)
3435 if packet.haslayer(TCP):
3436 self.tcp_port_out = packet[TCP].sport
3437 elif packet.haslayer(UDP):
3438 self.udp_port_out = packet[UDP].sport
3440 self.icmp_external_id = packet[ICMP].id
3442 self.logger.error(ppp("Unexpected or invalid packet "
3443 "(outside network):", packet))
3446 def initiate_tcp_session(self, in_if, out_if):
3448 Initiates TCP session
3450 :param in_if: Inside interface
3451 :param out_if: Outside interface
3454 # SYN packet in->out
3455 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3456 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3457 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3460 self.pg_enable_capture(self.pg_interfaces)
3462 capture = out_if.get_capture(1)
3464 self.tcp_port_out = p[TCP].sport
3466 # SYN + ACK packet out->in
3467 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3468 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3469 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3471 out_if.add_stream(p)
3472 self.pg_enable_capture(self.pg_interfaces)
3474 in_if.get_capture(1)
3476 # ACK packet in->out
3477 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3478 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3479 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3482 self.pg_enable_capture(self.pg_interfaces)
3484 out_if.get_capture(1)
3487 self.logger.error("TCP 3 way handshake failed")
3490 def verify_ipfix_max_entries_per_user(self, data):
3492 Verify IPFIX maximum entries per user exceeded event
3494 :param data: Decoded IPFIX data records
3496 self.assertEqual(1, len(data))
3499 self.assertEqual(ord(record[230]), 13)
3500 # natQuotaExceededEvent
3501 self.assertEqual('\x03\x00\x00\x00', record[466])
3503 self.assertEqual(self.pg0.remote_ip4n, record[8])
3505 def test_deterministic_mode(self):
3506 """ NAT plugin run deterministic mode """
3507 in_addr = '172.16.255.0'
3508 out_addr = '172.17.255.50'
3509 in_addr_t = '172.16.255.20'
3510 in_addr_n = socket.inet_aton(in_addr)
3511 out_addr_n = socket.inet_aton(out_addr)
3512 in_addr_t_n = socket.inet_aton(in_addr_t)
3516 nat_config = self.vapi.nat_show_config()
3517 self.assertEqual(1, nat_config.deterministic)
3519 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3521 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3522 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3523 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3524 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3526 deterministic_mappings = self.vapi.nat_det_map_dump()
3527 self.assertEqual(len(deterministic_mappings), 1)
3528 dsm = deterministic_mappings[0]
3529 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3530 self.assertEqual(in_plen, dsm.in_plen)
3531 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3532 self.assertEqual(out_plen, dsm.out_plen)
3534 self.clear_nat_det()
3535 deterministic_mappings = self.vapi.nat_det_map_dump()
3536 self.assertEqual(len(deterministic_mappings), 0)
3538 def test_set_timeouts(self):
3539 """ Set deterministic NAT timeouts """
3540 timeouts_before = self.vapi.nat_det_get_timeouts()
3542 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3543 timeouts_before.tcp_established + 10,
3544 timeouts_before.tcp_transitory + 10,
3545 timeouts_before.icmp + 10)
3547 timeouts_after = self.vapi.nat_det_get_timeouts()
3549 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3550 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3551 self.assertNotEqual(timeouts_before.tcp_established,
3552 timeouts_after.tcp_established)
3553 self.assertNotEqual(timeouts_before.tcp_transitory,
3554 timeouts_after.tcp_transitory)
3556 def test_det_in(self):
3557 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3559 nat_ip = "10.0.0.10"
3561 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3563 socket.inet_aton(nat_ip),
3565 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3566 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3570 pkts = self.create_stream_in(self.pg0, self.pg1)
3571 self.pg0.add_stream(pkts)
3572 self.pg_enable_capture(self.pg_interfaces)
3574 capture = self.pg1.get_capture(len(pkts))
3575 self.verify_capture_out(capture, nat_ip)
3578 pkts = self.create_stream_out(self.pg1, nat_ip)
3579 self.pg1.add_stream(pkts)
3580 self.pg_enable_capture(self.pg_interfaces)
3582 capture = self.pg0.get_capture(len(pkts))
3583 self.verify_capture_in(capture, self.pg0)
3586 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3587 self.assertEqual(len(sessions), 3)
3591 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3592 self.assertEqual(s.in_port, self.tcp_port_in)
3593 self.assertEqual(s.out_port, self.tcp_port_out)
3594 self.assertEqual(s.ext_port, self.tcp_external_port)
3598 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3599 self.assertEqual(s.in_port, self.udp_port_in)
3600 self.assertEqual(s.out_port, self.udp_port_out)
3601 self.assertEqual(s.ext_port, self.udp_external_port)
3605 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3606 self.assertEqual(s.in_port, self.icmp_id_in)
3607 self.assertEqual(s.out_port, self.icmp_external_id)
3609 def test_multiple_users(self):
3610 """ Deterministic NAT multiple users """
3612 nat_ip = "10.0.0.10"
3614 external_port = 6303
3616 host0 = self.pg0.remote_hosts[0]
3617 host1 = self.pg0.remote_hosts[1]
3619 self.vapi.nat_det_add_del_map(host0.ip4n,
3621 socket.inet_aton(nat_ip),
3623 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3624 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3628 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3629 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3630 TCP(sport=port_in, dport=external_port))
3631 self.pg0.add_stream(p)
3632 self.pg_enable_capture(self.pg_interfaces)
3634 capture = self.pg1.get_capture(1)
3639 self.assertEqual(ip.src, nat_ip)
3640 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3641 self.assertEqual(tcp.dport, external_port)
3642 port_out0 = tcp.sport
3644 self.logger.error(ppp("Unexpected or invalid packet:", p))
3648 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3649 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3650 TCP(sport=port_in, dport=external_port))
3651 self.pg0.add_stream(p)
3652 self.pg_enable_capture(self.pg_interfaces)
3654 capture = self.pg1.get_capture(1)
3659 self.assertEqual(ip.src, nat_ip)
3660 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3661 self.assertEqual(tcp.dport, external_port)
3662 port_out1 = tcp.sport
3664 self.logger.error(ppp("Unexpected or invalid packet:", p))
3667 dms = self.vapi.nat_det_map_dump()
3668 self.assertEqual(1, len(dms))
3669 self.assertEqual(2, dms[0].ses_num)
3672 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3673 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3674 TCP(sport=external_port, dport=port_out0))
3675 self.pg1.add_stream(p)
3676 self.pg_enable_capture(self.pg_interfaces)
3678 capture = self.pg0.get_capture(1)
3683 self.assertEqual(ip.src, self.pg1.remote_ip4)
3684 self.assertEqual(ip.dst, host0.ip4)
3685 self.assertEqual(tcp.dport, port_in)
3686 self.assertEqual(tcp.sport, external_port)
3688 self.logger.error(ppp("Unexpected or invalid packet:", p))
3692 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3693 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3694 TCP(sport=external_port, dport=port_out1))
3695 self.pg1.add_stream(p)
3696 self.pg_enable_capture(self.pg_interfaces)
3698 capture = self.pg0.get_capture(1)
3703 self.assertEqual(ip.src, self.pg1.remote_ip4)
3704 self.assertEqual(ip.dst, host1.ip4)
3705 self.assertEqual(tcp.dport, port_in)
3706 self.assertEqual(tcp.sport, external_port)
3708 self.logger.error(ppp("Unexpected or invalid packet", p))
3711 # session close api test
3712 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3714 self.pg1.remote_ip4n,
3716 dms = self.vapi.nat_det_map_dump()
3717 self.assertEqual(dms[0].ses_num, 1)
3719 self.vapi.nat_det_close_session_in(host0.ip4n,
3721 self.pg1.remote_ip4n,
3723 dms = self.vapi.nat_det_map_dump()
3724 self.assertEqual(dms[0].ses_num, 0)
3726 def test_tcp_session_close_detection_in(self):
3727 """ Deterministic NAT TCP session close from inside network """
3728 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3730 socket.inet_aton(self.nat_addr),
3732 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3733 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3736 self.initiate_tcp_session(self.pg0, self.pg1)
3738 # close the session from inside
3740 # FIN packet in -> out
3741 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3742 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3743 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3745 self.pg0.add_stream(p)
3746 self.pg_enable_capture(self.pg_interfaces)
3748 self.pg1.get_capture(1)
3752 # ACK packet out -> in
3753 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3754 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3755 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3759 # FIN packet out -> in
3760 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3761 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3762 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3766 self.pg1.add_stream(pkts)
3767 self.pg_enable_capture(self.pg_interfaces)
3769 self.pg0.get_capture(2)
3771 # ACK packet in -> out
3772 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3773 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3774 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3776 self.pg0.add_stream(p)
3777 self.pg_enable_capture(self.pg_interfaces)
3779 self.pg1.get_capture(1)
3781 # Check if deterministic NAT44 closed the session
3782 dms = self.vapi.nat_det_map_dump()
3783 self.assertEqual(0, dms[0].ses_num)
3785 self.logger.error("TCP session termination failed")
3788 def test_tcp_session_close_detection_out(self):
3789 """ Deterministic NAT TCP session close from outside network """
3790 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3792 socket.inet_aton(self.nat_addr),
3794 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3795 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3798 self.initiate_tcp_session(self.pg0, self.pg1)
3800 # close the session from outside
3802 # FIN packet out -> in
3803 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3804 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3805 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3807 self.pg1.add_stream(p)
3808 self.pg_enable_capture(self.pg_interfaces)
3810 self.pg0.get_capture(1)
3814 # ACK packet in -> out
3815 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3816 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3817 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3821 # ACK packet in -> out
3822 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3823 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3824 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3828 self.pg0.add_stream(pkts)
3829 self.pg_enable_capture(self.pg_interfaces)
3831 self.pg1.get_capture(2)
3833 # ACK packet out -> in
3834 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3835 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3836 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3838 self.pg1.add_stream(p)
3839 self.pg_enable_capture(self.pg_interfaces)
3841 self.pg0.get_capture(1)
3843 # Check if deterministic NAT44 closed the session
3844 dms = self.vapi.nat_det_map_dump()
3845 self.assertEqual(0, dms[0].ses_num)
3847 self.logger.error("TCP session termination failed")
3850 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3851 def test_session_timeout(self):
3852 """ Deterministic NAT session timeouts """
3853 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3855 socket.inet_aton(self.nat_addr),
3857 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3858 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3861 self.initiate_tcp_session(self.pg0, self.pg1)
3862 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3863 pkts = self.create_stream_in(self.pg0, self.pg1)
3864 self.pg0.add_stream(pkts)
3865 self.pg_enable_capture(self.pg_interfaces)
3867 capture = self.pg1.get_capture(len(pkts))
3870 dms = self.vapi.nat_det_map_dump()
3871 self.assertEqual(0, dms[0].ses_num)
3873 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3874 def test_session_limit_per_user(self):
3875 """ Deterministic NAT maximum sessions per user limit """
3876 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3878 socket.inet_aton(self.nat_addr),
3880 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3881 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3883 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3884 src_address=self.pg2.local_ip4n,
3886 template_interval=10)
3887 self.vapi.nat_ipfix()
3890 for port in range(1025, 2025):
3891 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3892 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3893 UDP(sport=port, dport=port))
3896 self.pg0.add_stream(pkts)
3897 self.pg_enable_capture(self.pg_interfaces)
3899 capture = self.pg1.get_capture(len(pkts))
3901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3902 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3903 UDP(sport=3001, dport=3002))
3904 self.pg0.add_stream(p)
3905 self.pg_enable_capture(self.pg_interfaces)
3907 capture = self.pg1.assert_nothing_captured()
3909 # verify ICMP error packet
3910 capture = self.pg0.get_capture(1)
3912 self.assertTrue(p.haslayer(ICMP))
3914 self.assertEqual(icmp.type, 3)
3915 self.assertEqual(icmp.code, 1)
3916 self.assertTrue(icmp.haslayer(IPerror))
3917 inner_ip = icmp[IPerror]
3918 self.assertEqual(inner_ip[UDPerror].sport, 3001)
3919 self.assertEqual(inner_ip[UDPerror].dport, 3002)
3921 dms = self.vapi.nat_det_map_dump()
3923 self.assertEqual(1000, dms[0].ses_num)
3925 # verify IPFIX logging
3926 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3928 capture = self.pg2.get_capture(2)
3929 ipfix = IPFIXDecoder()
3930 # first load template
3932 self.assertTrue(p.haslayer(IPFIX))
3933 if p.haslayer(Template):
3934 ipfix.add_template(p.getlayer(Template))
3935 # verify events in data set
3937 if p.haslayer(Data):
3938 data = ipfix.decode_data_set(p.getlayer(Set))
3939 self.verify_ipfix_max_entries_per_user(data)
3941 def clear_nat_det(self):
3943 Clear deterministic NAT configuration.
3945 self.vapi.nat_ipfix(enable=0)
3946 self.vapi.nat_det_set_timeouts()
3947 deterministic_mappings = self.vapi.nat_det_map_dump()
3948 for dsm in deterministic_mappings:
3949 self.vapi.nat_det_add_del_map(dsm.in_addr,
3955 interfaces = self.vapi.nat44_interface_dump()
3956 for intf in interfaces:
3957 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3962 super(TestDeterministicNAT, self).tearDown()
3963 if not self.vpp_dead:
3964 self.logger.info(self.vapi.cli("show nat44 detail"))
3965 self.clear_nat_det()
3968 class TestNAT64(MethodHolder):
3969 """ NAT64 Test Cases """
3972 def setUpClass(cls):
3973 super(TestNAT64, cls).setUpClass()
3976 cls.tcp_port_in = 6303
3977 cls.tcp_port_out = 6303
3978 cls.udp_port_in = 6304
3979 cls.udp_port_out = 6304
3980 cls.icmp_id_in = 6305
3981 cls.icmp_id_out = 6305
3982 cls.nat_addr = '10.0.0.3'
3983 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3985 cls.vrf1_nat_addr = '10.0.10.3'
3986 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3989 cls.create_pg_interfaces(range(5))
3990 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3991 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3992 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3994 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3996 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3998 cls.pg0.generate_remote_hosts(2)
4000 for i in cls.ip6_interfaces:
4003 i.configure_ipv6_neighbors()
4005 for i in cls.ip4_interfaces:
4011 cls.pg3.config_ip4()
4012 cls.pg3.resolve_arp()
4013 cls.pg3.config_ip6()
4014 cls.pg3.configure_ipv6_neighbors()
4017 super(TestNAT64, cls).tearDownClass()
4020 def test_pool(self):
4021 """ Add/delete address to NAT64 pool """
4022 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4024 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4026 addresses = self.vapi.nat64_pool_addr_dump()
4027 self.assertEqual(len(addresses), 1)
4028 self.assertEqual(addresses[0].address, nat_addr)
4030 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4032 addresses = self.vapi.nat64_pool_addr_dump()
4033 self.assertEqual(len(addresses), 0)
4035 def test_interface(self):
4036 """ Enable/disable NAT64 feature on the interface """
4037 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4038 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4040 interfaces = self.vapi.nat64_interface_dump()
4041 self.assertEqual(len(interfaces), 2)
4044 for intf in interfaces:
4045 if intf.sw_if_index == self.pg0.sw_if_index:
4046 self.assertEqual(intf.is_inside, 1)
4048 elif intf.sw_if_index == self.pg1.sw_if_index:
4049 self.assertEqual(intf.is_inside, 0)
4051 self.assertTrue(pg0_found)
4052 self.assertTrue(pg1_found)
4054 features = self.vapi.cli("show interface features pg0")
4055 self.assertNotEqual(features.find('nat64-in2out'), -1)
4056 features = self.vapi.cli("show interface features pg1")
4057 self.assertNotEqual(features.find('nat64-out2in'), -1)
4059 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4060 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4062 interfaces = self.vapi.nat64_interface_dump()
4063 self.assertEqual(len(interfaces), 0)
4065 def test_static_bib(self):
4066 """ Add/delete static BIB entry """
4067 in_addr = socket.inet_pton(socket.AF_INET6,
4068 '2001:db8:85a3::8a2e:370:7334')
4069 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4072 proto = IP_PROTOS.tcp
4074 self.vapi.nat64_add_del_static_bib(in_addr,
4079 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4084 self.assertEqual(bibe.i_addr, in_addr)
4085 self.assertEqual(bibe.o_addr, out_addr)
4086 self.assertEqual(bibe.i_port, in_port)
4087 self.assertEqual(bibe.o_port, out_port)
4088 self.assertEqual(static_bib_num, 1)
4090 self.vapi.nat64_add_del_static_bib(in_addr,
4096 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4101 self.assertEqual(static_bib_num, 0)
4103 def test_set_timeouts(self):
4104 """ Set NAT64 timeouts """
4105 # verify default values
4106 timeouts = self.vapi.nat64_get_timeouts()
4107 self.assertEqual(timeouts.udp, 300)
4108 self.assertEqual(timeouts.icmp, 60)
4109 self.assertEqual(timeouts.tcp_trans, 240)
4110 self.assertEqual(timeouts.tcp_est, 7440)
4111 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4113 # set and verify custom values
4114 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4115 tcp_est=7450, tcp_incoming_syn=10)
4116 timeouts = self.vapi.nat64_get_timeouts()
4117 self.assertEqual(timeouts.udp, 200)
4118 self.assertEqual(timeouts.icmp, 30)
4119 self.assertEqual(timeouts.tcp_trans, 250)
4120 self.assertEqual(timeouts.tcp_est, 7450)
4121 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4123 def test_dynamic(self):
4124 """ NAT64 dynamic translation test """
4125 self.tcp_port_in = 6303
4126 self.udp_port_in = 6304
4127 self.icmp_id_in = 6305
4129 ses_num_start = self.nat64_get_ses_num()
4131 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4133 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4134 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4137 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4138 self.pg0.add_stream(pkts)
4139 self.pg_enable_capture(self.pg_interfaces)
4141 capture = self.pg1.get_capture(len(pkts))
4142 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4143 dst_ip=self.pg1.remote_ip4)
4146 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4147 self.pg1.add_stream(pkts)
4148 self.pg_enable_capture(self.pg_interfaces)
4150 capture = self.pg0.get_capture(len(pkts))
4151 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4152 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4155 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4156 self.pg0.add_stream(pkts)
4157 self.pg_enable_capture(self.pg_interfaces)
4159 capture = self.pg1.get_capture(len(pkts))
4160 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4161 dst_ip=self.pg1.remote_ip4)
4164 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4165 self.pg1.add_stream(pkts)
4166 self.pg_enable_capture(self.pg_interfaces)
4168 capture = self.pg0.get_capture(len(pkts))
4169 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4171 ses_num_end = self.nat64_get_ses_num()
4173 self.assertEqual(ses_num_end - ses_num_start, 3)
4175 # tenant with specific VRF
4176 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4177 self.vrf1_nat_addr_n,
4178 vrf_id=self.vrf1_id)
4179 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4181 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4182 self.pg2.add_stream(pkts)
4183 self.pg_enable_capture(self.pg_interfaces)
4185 capture = self.pg1.get_capture(len(pkts))
4186 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4187 dst_ip=self.pg1.remote_ip4)
4189 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4190 self.pg1.add_stream(pkts)
4191 self.pg_enable_capture(self.pg_interfaces)
4193 capture = self.pg2.get_capture(len(pkts))
4194 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4196 def test_static(self):
4197 """ NAT64 static translation test """
4198 self.tcp_port_in = 60303
4199 self.udp_port_in = 60304
4200 self.icmp_id_in = 60305
4201 self.tcp_port_out = 60303
4202 self.udp_port_out = 60304
4203 self.icmp_id_out = 60305
4205 ses_num_start = self.nat64_get_ses_num()
4207 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4209 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4210 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4212 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4217 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4222 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4229 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4230 self.pg0.add_stream(pkts)
4231 self.pg_enable_capture(self.pg_interfaces)
4233 capture = self.pg1.get_capture(len(pkts))
4234 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4235 dst_ip=self.pg1.remote_ip4, same_port=True)
4238 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4239 self.pg1.add_stream(pkts)
4240 self.pg_enable_capture(self.pg_interfaces)
4242 capture = self.pg0.get_capture(len(pkts))
4243 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4244 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4246 ses_num_end = self.nat64_get_ses_num()
4248 self.assertEqual(ses_num_end - ses_num_start, 3)
4250 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4251 def test_session_timeout(self):
4252 """ NAT64 session timeout """
4253 self.icmp_id_in = 1234
4254 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4256 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4257 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4258 self.vapi.nat64_set_timeouts(icmp=5)
4260 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4261 self.pg0.add_stream(pkts)
4262 self.pg_enable_capture(self.pg_interfaces)
4264 capture = self.pg1.get_capture(len(pkts))
4266 ses_num_before_timeout = self.nat64_get_ses_num()
4270 # ICMP session after timeout
4271 ses_num_after_timeout = self.nat64_get_ses_num()
4272 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4274 def test_icmp_error(self):
4275 """ NAT64 ICMP Error message translation """
4276 self.tcp_port_in = 6303
4277 self.udp_port_in = 6304
4278 self.icmp_id_in = 6305
4280 ses_num_start = self.nat64_get_ses_num()
4282 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4284 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4285 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4287 # send some packets to create sessions
4288 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4289 self.pg0.add_stream(pkts)
4290 self.pg_enable_capture(self.pg_interfaces)
4292 capture_ip4 = self.pg1.get_capture(len(pkts))
4293 self.verify_capture_out(capture_ip4,
4294 nat_ip=self.nat_addr,
4295 dst_ip=self.pg1.remote_ip4)
4297 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4298 self.pg1.add_stream(pkts)
4299 self.pg_enable_capture(self.pg_interfaces)
4301 capture_ip6 = self.pg0.get_capture(len(pkts))
4302 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4303 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4304 self.pg0.remote_ip6)
4307 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4308 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4309 ICMPv6DestUnreach(code=1) /
4310 packet[IPv6] for packet in capture_ip6]
4311 self.pg0.add_stream(pkts)
4312 self.pg_enable_capture(self.pg_interfaces)
4314 capture = self.pg1.get_capture(len(pkts))
4315 for packet in capture:
4317 self.assertEqual(packet[IP].src, self.nat_addr)
4318 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4319 self.assertEqual(packet[ICMP].type, 3)
4320 self.assertEqual(packet[ICMP].code, 13)
4321 inner = packet[IPerror]
4322 self.assertEqual(inner.src, self.pg1.remote_ip4)
4323 self.assertEqual(inner.dst, self.nat_addr)
4324 self.check_icmp_checksum(packet)
4325 if inner.haslayer(TCPerror):
4326 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4327 elif inner.haslayer(UDPerror):
4328 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4330 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4332 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4336 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4337 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4338 ICMP(type=3, code=13) /
4339 packet[IP] for packet in capture_ip4]
4340 self.pg1.add_stream(pkts)
4341 self.pg_enable_capture(self.pg_interfaces)
4343 capture = self.pg0.get_capture(len(pkts))
4344 for packet in capture:
4346 self.assertEqual(packet[IPv6].src, ip.src)
4347 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4348 icmp = packet[ICMPv6DestUnreach]
4349 self.assertEqual(icmp.code, 1)
4350 inner = icmp[IPerror6]
4351 self.assertEqual(inner.src, self.pg0.remote_ip6)
4352 self.assertEqual(inner.dst, ip.src)
4353 self.check_icmpv6_checksum(packet)
4354 if inner.haslayer(TCPerror):
4355 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4356 elif inner.haslayer(UDPerror):
4357 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4359 self.assertEqual(inner[ICMPv6EchoRequest].id,
4362 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4365 def test_hairpinning(self):
4366 """ NAT64 hairpinning """
4368 client = self.pg0.remote_hosts[0]
4369 server = self.pg0.remote_hosts[1]
4370 server_tcp_in_port = 22
4371 server_tcp_out_port = 4022
4372 server_udp_in_port = 23
4373 server_udp_out_port = 4023
4374 client_tcp_in_port = 1234
4375 client_udp_in_port = 1235
4376 client_tcp_out_port = 0
4377 client_udp_out_port = 0
4378 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4379 nat_addr_ip6 = ip.src
4381 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4383 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4384 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4386 self.vapi.nat64_add_del_static_bib(server.ip6n,
4389 server_tcp_out_port,
4391 self.vapi.nat64_add_del_static_bib(server.ip6n,
4394 server_udp_out_port,
4399 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4400 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4401 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4403 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4404 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4405 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4407 self.pg0.add_stream(pkts)
4408 self.pg_enable_capture(self.pg_interfaces)
4410 capture = self.pg0.get_capture(len(pkts))
4411 for packet in capture:
4413 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4414 self.assertEqual(packet[IPv6].dst, server.ip6)
4415 if packet.haslayer(TCP):
4416 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4417 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4418 self.check_tcp_checksum(packet)
4419 client_tcp_out_port = packet[TCP].sport
4421 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4422 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4423 self.check_udp_checksum(packet)
4424 client_udp_out_port = packet[UDP].sport
4426 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4431 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4432 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4433 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4435 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4436 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4437 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4439 self.pg0.add_stream(pkts)
4440 self.pg_enable_capture(self.pg_interfaces)
4442 capture = self.pg0.get_capture(len(pkts))
4443 for packet in capture:
4445 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4446 self.assertEqual(packet[IPv6].dst, client.ip6)
4447 if packet.haslayer(TCP):
4448 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4449 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4450 self.check_tcp_checksum(packet)
4452 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4453 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4454 self.check_udp_checksum(packet)
4456 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4461 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4462 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4463 ICMPv6DestUnreach(code=1) /
4464 packet[IPv6] for packet in capture]
4465 self.pg0.add_stream(pkts)
4466 self.pg_enable_capture(self.pg_interfaces)
4468 capture = self.pg0.get_capture(len(pkts))
4469 for packet in capture:
4471 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4472 self.assertEqual(packet[IPv6].dst, server.ip6)
4473 icmp = packet[ICMPv6DestUnreach]
4474 self.assertEqual(icmp.code, 1)
4475 inner = icmp[IPerror6]
4476 self.assertEqual(inner.src, server.ip6)
4477 self.assertEqual(inner.dst, nat_addr_ip6)
4478 self.check_icmpv6_checksum(packet)
4479 if inner.haslayer(TCPerror):
4480 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4481 self.assertEqual(inner[TCPerror].dport,
4482 client_tcp_out_port)
4484 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4485 self.assertEqual(inner[UDPerror].dport,
4486 client_udp_out_port)
4488 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4491 def test_prefix(self):
4492 """ NAT64 Network-Specific Prefix """
4494 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4496 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4497 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4498 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4499 self.vrf1_nat_addr_n,
4500 vrf_id=self.vrf1_id)
4501 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4504 global_pref64 = "2001:db8::"
4505 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4506 global_pref64_len = 32
4507 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4509 prefix = self.vapi.nat64_prefix_dump()
4510 self.assertEqual(len(prefix), 1)
4511 self.assertEqual(prefix[0].prefix, global_pref64_n)
4512 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4513 self.assertEqual(prefix[0].vrf_id, 0)
4515 # Add tenant specific prefix
4516 vrf1_pref64 = "2001:db8:122:300::"
4517 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4518 vrf1_pref64_len = 56
4519 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4521 vrf_id=self.vrf1_id)
4522 prefix = self.vapi.nat64_prefix_dump()
4523 self.assertEqual(len(prefix), 2)
4526 pkts = self.create_stream_in_ip6(self.pg0,
4529 plen=global_pref64_len)
4530 self.pg0.add_stream(pkts)
4531 self.pg_enable_capture(self.pg_interfaces)
4533 capture = self.pg1.get_capture(len(pkts))
4534 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4535 dst_ip=self.pg1.remote_ip4)
4537 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4538 self.pg1.add_stream(pkts)
4539 self.pg_enable_capture(self.pg_interfaces)
4541 capture = self.pg0.get_capture(len(pkts))
4542 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4545 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4547 # Tenant specific prefix
4548 pkts = self.create_stream_in_ip6(self.pg2,
4551 plen=vrf1_pref64_len)
4552 self.pg2.add_stream(pkts)
4553 self.pg_enable_capture(self.pg_interfaces)
4555 capture = self.pg1.get_capture(len(pkts))
4556 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4557 dst_ip=self.pg1.remote_ip4)
4559 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4560 self.pg1.add_stream(pkts)
4561 self.pg_enable_capture(self.pg_interfaces)
4563 capture = self.pg2.get_capture(len(pkts))
4564 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4567 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4569 def test_unknown_proto(self):
4570 """ NAT64 translate packet with unknown protocol """
4572 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4574 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4575 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4576 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4579 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4580 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4581 TCP(sport=self.tcp_port_in, dport=20))
4582 self.pg0.add_stream(p)
4583 self.pg_enable_capture(self.pg_interfaces)
4585 p = self.pg1.get_capture(1)
4587 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4588 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4590 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4591 TCP(sport=1234, dport=1234))
4592 self.pg0.add_stream(p)
4593 self.pg_enable_capture(self.pg_interfaces)
4595 p = self.pg1.get_capture(1)
4598 self.assertEqual(packet[IP].src, self.nat_addr)
4599 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4600 self.assertTrue(packet.haslayer(GRE))
4601 self.check_ip_checksum(packet)
4603 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4607 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4608 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4610 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4611 TCP(sport=1234, dport=1234))
4612 self.pg1.add_stream(p)
4613 self.pg_enable_capture(self.pg_interfaces)
4615 p = self.pg0.get_capture(1)
4618 self.assertEqual(packet[IPv6].src, remote_ip6)
4619 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4620 self.assertEqual(packet[IPv6].nh, 47)
4622 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4625 def test_hairpinning_unknown_proto(self):
4626 """ NAT64 translate packet with unknown protocol - hairpinning """
4628 client = self.pg0.remote_hosts[0]
4629 server = self.pg0.remote_hosts[1]
4630 server_tcp_in_port = 22
4631 server_tcp_out_port = 4022
4632 client_tcp_in_port = 1234
4633 client_tcp_out_port = 1235
4634 server_nat_ip = "10.0.0.100"
4635 client_nat_ip = "10.0.0.110"
4636 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4637 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4638 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4639 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4641 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4643 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4644 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4646 self.vapi.nat64_add_del_static_bib(server.ip6n,
4649 server_tcp_out_port,
4652 self.vapi.nat64_add_del_static_bib(server.ip6n,
4658 self.vapi.nat64_add_del_static_bib(client.ip6n,
4661 client_tcp_out_port,
4665 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4666 IPv6(src=client.ip6, dst=server_nat_ip6) /
4667 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4668 self.pg0.add_stream(p)
4669 self.pg_enable_capture(self.pg_interfaces)
4671 p = self.pg0.get_capture(1)
4673 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4674 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4676 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4677 TCP(sport=1234, dport=1234))
4678 self.pg0.add_stream(p)
4679 self.pg_enable_capture(self.pg_interfaces)
4681 p = self.pg0.get_capture(1)
4684 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4685 self.assertEqual(packet[IPv6].dst, server.ip6)
4686 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4688 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4692 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4693 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4695 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4696 TCP(sport=1234, dport=1234))
4697 self.pg0.add_stream(p)
4698 self.pg_enable_capture(self.pg_interfaces)
4700 p = self.pg0.get_capture(1)
4703 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4704 self.assertEqual(packet[IPv6].dst, client.ip6)
4705 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4707 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4710 def test_one_armed_nat64(self):
4711 """ One armed NAT64 """
4713 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4717 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4719 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4720 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4723 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4724 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4725 TCP(sport=12345, dport=80))
4726 self.pg3.add_stream(p)
4727 self.pg_enable_capture(self.pg_interfaces)
4729 capture = self.pg3.get_capture(1)
4734 self.assertEqual(ip.src, self.nat_addr)
4735 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4736 self.assertNotEqual(tcp.sport, 12345)
4737 external_port = tcp.sport
4738 self.assertEqual(tcp.dport, 80)
4739 self.check_tcp_checksum(p)
4740 self.check_ip_checksum(p)
4742 self.logger.error(ppp("Unexpected or invalid packet:", p))
4746 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4747 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4748 TCP(sport=80, dport=external_port))
4749 self.pg3.add_stream(p)
4750 self.pg_enable_capture(self.pg_interfaces)
4752 capture = self.pg3.get_capture(1)
4757 self.assertEqual(ip.src, remote_host_ip6)
4758 self.assertEqual(ip.dst, self.pg3.remote_ip6)
4759 self.assertEqual(tcp.sport, 80)
4760 self.assertEqual(tcp.dport, 12345)
4761 self.check_tcp_checksum(p)
4763 self.logger.error(ppp("Unexpected or invalid packet:", p))
4766 def test_frag_in_order(self):
4767 """ NAT64 translate fragments arriving in order """
4768 self.tcp_port_in = random.randint(1025, 65535)
4770 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4772 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4773 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4775 reass = self.vapi.nat_reass_dump()
4776 reass_n_start = len(reass)
4780 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4781 self.tcp_port_in, 20, data)
4782 self.pg0.add_stream(pkts)
4783 self.pg_enable_capture(self.pg_interfaces)
4785 frags = self.pg1.get_capture(len(pkts))
4786 p = self.reass_frags_and_verify(frags,
4788 self.pg1.remote_ip4)
4789 self.assertEqual(p[TCP].dport, 20)
4790 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4791 self.tcp_port_out = p[TCP].sport
4792 self.assertEqual(data, p[Raw].load)
4795 data = "A" * 4 + "b" * 16 + "C" * 3
4796 pkts = self.create_stream_frag(self.pg1,
4801 self.pg1.add_stream(pkts)
4802 self.pg_enable_capture(self.pg_interfaces)
4804 frags = self.pg0.get_capture(len(pkts))
4805 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4806 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4807 self.assertEqual(p[TCP].sport, 20)
4808 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4809 self.assertEqual(data, p[Raw].load)
4811 reass = self.vapi.nat_reass_dump()
4812 reass_n_end = len(reass)
4814 self.assertEqual(reass_n_end - reass_n_start, 2)
4816 def test_reass_hairpinning(self):
4817 """ NAT64 fragments hairpinning """
4819 client = self.pg0.remote_hosts[0]
4820 server = self.pg0.remote_hosts[1]
4821 server_in_port = random.randint(1025, 65535)
4822 server_out_port = random.randint(1025, 65535)
4823 client_in_port = random.randint(1025, 65535)
4824 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4825 nat_addr_ip6 = ip.src
4827 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4829 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4830 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4832 # add static BIB entry for server
4833 self.vapi.nat64_add_del_static_bib(server.ip6n,
4839 # send packet from host to server
4840 pkts = self.create_stream_frag_ip6(self.pg0,
4845 self.pg0.add_stream(pkts)
4846 self.pg_enable_capture(self.pg_interfaces)
4848 frags = self.pg0.get_capture(len(pkts))
4849 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4850 self.assertNotEqual(p[TCP].sport, client_in_port)
4851 self.assertEqual(p[TCP].dport, server_in_port)
4852 self.assertEqual(data, p[Raw].load)
4854 def test_frag_out_of_order(self):
4855 """ NAT64 translate fragments arriving out of order """
4856 self.tcp_port_in = random.randint(1025, 65535)
4858 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4860 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4861 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4865 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4866 self.tcp_port_in, 20, data)
4868 self.pg0.add_stream(pkts)
4869 self.pg_enable_capture(self.pg_interfaces)
4871 frags = self.pg1.get_capture(len(pkts))
4872 p = self.reass_frags_and_verify(frags,
4874 self.pg1.remote_ip4)
4875 self.assertEqual(p[TCP].dport, 20)
4876 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4877 self.tcp_port_out = p[TCP].sport
4878 self.assertEqual(data, p[Raw].load)
4881 data = "A" * 4 + "B" * 16 + "C" * 3
4882 pkts = self.create_stream_frag(self.pg1,
4888 self.pg1.add_stream(pkts)
4889 self.pg_enable_capture(self.pg_interfaces)
4891 frags = self.pg0.get_capture(len(pkts))
4892 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4893 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4894 self.assertEqual(p[TCP].sport, 20)
4895 self.assertEqual(p[TCP].dport, self.tcp_port_in)
4896 self.assertEqual(data, p[Raw].load)
4898 def test_interface_addr(self):
4899 """ Acquire NAT64 pool addresses from interface """
4900 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4902 # no address in NAT64 pool
4903 adresses = self.vapi.nat44_address_dump()
4904 self.assertEqual(0, len(adresses))
4906 # configure interface address and check NAT64 address pool
4907 self.pg4.config_ip4()
4908 addresses = self.vapi.nat64_pool_addr_dump()
4909 self.assertEqual(len(addresses), 1)
4910 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4912 # remove interface address and check NAT64 address pool
4913 self.pg4.unconfig_ip4()
4914 addresses = self.vapi.nat64_pool_addr_dump()
4915 self.assertEqual(0, len(adresses))
4917 def nat64_get_ses_num(self):
4919 Return number of active NAT64 sessions.
4921 st = self.vapi.nat64_st_dump()
4924 def clear_nat64(self):
4926 Clear NAT64 configuration.
4928 self.vapi.nat64_set_timeouts()
4930 interfaces = self.vapi.nat64_interface_dump()
4931 for intf in interfaces:
4932 if intf.is_inside > 1:
4933 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4936 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4940 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4943 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4951 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4954 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4962 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4965 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4973 adresses = self.vapi.nat64_pool_addr_dump()
4974 for addr in adresses:
4975 self.vapi.nat64_add_del_pool_addr_range(addr.address,
4980 prefixes = self.vapi.nat64_prefix_dump()
4981 for prefix in prefixes:
4982 self.vapi.nat64_add_del_prefix(prefix.prefix,
4984 vrf_id=prefix.vrf_id,
4988 super(TestNAT64, self).tearDown()
4989 if not self.vpp_dead:
4990 self.logger.info(self.vapi.cli("show nat64 pool"))
4991 self.logger.info(self.vapi.cli("show nat64 interfaces"))
4992 self.logger.info(self.vapi.cli("show nat64 prefix"))
4993 self.logger.info(self.vapi.cli("show nat64 bib all"))
4994 self.logger.info(self.vapi.cli("show nat64 session table all"))
4995 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4999 class TestDSlite(MethodHolder):
5000 """ DS-Lite Test Cases """
5003 def setUpClass(cls):
5004 super(TestDSlite, cls).setUpClass()
5007 cls.nat_addr = '10.0.0.3'
5008 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5010 cls.create_pg_interfaces(range(2))
5012 cls.pg0.config_ip4()
5013 cls.pg0.resolve_arp()
5015 cls.pg1.config_ip6()
5016 cls.pg1.generate_remote_hosts(2)
5017 cls.pg1.configure_ipv6_neighbors()
5020 super(TestDSlite, cls).tearDownClass()
5023 def test_dslite(self):
5024 """ Test DS-Lite """
5025 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5027 aftr_ip4 = '192.0.0.1'
5028 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5029 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5030 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5031 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5034 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5035 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5036 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5037 UDP(sport=20000, dport=10000))
5038 self.pg1.add_stream(p)
5039 self.pg_enable_capture(self.pg_interfaces)
5041 capture = self.pg0.get_capture(1)
5042 capture = capture[0]
5043 self.assertFalse(capture.haslayer(IPv6))
5044 self.assertEqual(capture[IP].src, self.nat_addr)
5045 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5046 self.assertNotEqual(capture[UDP].sport, 20000)
5047 self.assertEqual(capture[UDP].dport, 10000)
5048 self.check_ip_checksum(capture)
5049 out_port = capture[UDP].sport
5051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5052 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5053 UDP(sport=10000, dport=out_port))
5054 self.pg0.add_stream(p)
5055 self.pg_enable_capture(self.pg_interfaces)
5057 capture = self.pg1.get_capture(1)
5058 capture = capture[0]
5059 self.assertEqual(capture[IPv6].src, aftr_ip6)
5060 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5061 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5062 self.assertEqual(capture[IP].dst, '192.168.1.1')
5063 self.assertEqual(capture[UDP].sport, 10000)
5064 self.assertEqual(capture[UDP].dport, 20000)
5065 self.check_ip_checksum(capture)
5068 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5069 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5070 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5071 TCP(sport=20001, dport=10001))
5072 self.pg1.add_stream(p)
5073 self.pg_enable_capture(self.pg_interfaces)
5075 capture = self.pg0.get_capture(1)
5076 capture = capture[0]
5077 self.assertFalse(capture.haslayer(IPv6))
5078 self.assertEqual(capture[IP].src, self.nat_addr)
5079 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5080 self.assertNotEqual(capture[TCP].sport, 20001)
5081 self.assertEqual(capture[TCP].dport, 10001)
5082 self.check_ip_checksum(capture)
5083 self.check_tcp_checksum(capture)
5084 out_port = capture[TCP].sport
5086 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5087 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5088 TCP(sport=10001, dport=out_port))
5089 self.pg0.add_stream(p)
5090 self.pg_enable_capture(self.pg_interfaces)
5092 capture = self.pg1.get_capture(1)
5093 capture = capture[0]
5094 self.assertEqual(capture[IPv6].src, aftr_ip6)
5095 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5096 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5097 self.assertEqual(capture[IP].dst, '192.168.1.1')
5098 self.assertEqual(capture[TCP].sport, 10001)
5099 self.assertEqual(capture[TCP].dport, 20001)
5100 self.check_ip_checksum(capture)
5101 self.check_tcp_checksum(capture)
5104 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5105 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5106 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5107 ICMP(id=4000, type='echo-request'))
5108 self.pg1.add_stream(p)
5109 self.pg_enable_capture(self.pg_interfaces)
5111 capture = self.pg0.get_capture(1)
5112 capture = capture[0]
5113 self.assertFalse(capture.haslayer(IPv6))
5114 self.assertEqual(capture[IP].src, self.nat_addr)
5115 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5116 self.assertNotEqual(capture[ICMP].id, 4000)
5117 self.check_ip_checksum(capture)
5118 self.check_icmp_checksum(capture)
5119 out_id = capture[ICMP].id
5121 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5122 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5123 ICMP(id=out_id, type='echo-reply'))
5124 self.pg0.add_stream(p)
5125 self.pg_enable_capture(self.pg_interfaces)
5127 capture = self.pg1.get_capture(1)
5128 capture = capture[0]
5129 self.assertEqual(capture[IPv6].src, aftr_ip6)
5130 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5131 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5132 self.assertEqual(capture[IP].dst, '192.168.1.1')
5133 self.assertEqual(capture[ICMP].id, 4000)
5134 self.check_ip_checksum(capture)
5135 self.check_icmp_checksum(capture)
5137 # ping DS-Lite AFTR tunnel endpoint address
5138 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5139 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5140 ICMPv6EchoRequest())
5141 self.pg1.add_stream(p)
5142 self.pg_enable_capture(self.pg_interfaces)
5144 capture = self.pg1.get_capture(1)
5145 self.assertEqual(1, len(capture))
5146 capture = capture[0]
5147 self.assertEqual(capture[IPv6].src, aftr_ip6)
5148 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5149 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5152 super(TestDSlite, self).tearDown()
5153 if not self.vpp_dead:
5154 self.logger.info(self.vapi.cli("show dslite pool"))
5156 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5157 self.logger.info(self.vapi.cli("show dslite sessions"))
5159 if __name__ == '__main__':
5160 unittest.main(testRunner=VppTestRunner)