9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_dynamic_to_static(self):
1680 """ Switch from dynamic translation to 1:1NAT """
1681 nat_ip = "10.0.0.10"
1682 self.tcp_port_out = 6303
1683 self.udp_port_out = 6304
1684 self.icmp_id_out = 6305
1686 self.nat44_add_address(self.nat_addr)
1687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692 pkts = self.create_stream_in(self.pg0, self.pg1)
1693 self.pg0.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg1.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1700 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702 self.assertEqual(len(sessions), 0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_identity_nat(self):
1711 """ Identity NAT """
1713 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1718 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720 TCP(sport=12345, dport=56789))
1721 self.pg1.add_stream(p)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg0.get_capture(1)
1729 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730 self.assertEqual(ip.src, self.pg1.remote_ip4)
1731 self.assertEqual(tcp.dport, 56789)
1732 self.assertEqual(tcp.sport, 12345)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1739 def test_static_lb(self):
1740 """ NAT44 local service load balancing """
1741 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1744 server1 = self.pg0.remote_hosts[0]
1745 server2 = self.pg0.remote_hosts[1]
1747 locals = [{'addr': server1.ip4n,
1750 {'addr': server2.ip4n,
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1758 local_num=len(locals),
1760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1764 # from client to service
1765 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767 TCP(sport=12345, dport=external_port))
1768 self.pg1.add_stream(p)
1769 self.pg_enable_capture(self.pg_interfaces)
1771 capture = self.pg0.get_capture(1)
1777 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778 if ip.dst == server1.ip4:
1782 self.assertEqual(tcp.dport, local_port)
1783 self.check_tcp_checksum(p)
1784 self.check_ip_checksum(p)
1786 self.logger.error(ppp("Unexpected or invalid packet:", p))
1789 # from service back to client
1790 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792 TCP(sport=local_port, dport=12345))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg1.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(tcp.sport, external_port)
1803 self.check_tcp_checksum(p)
1804 self.check_ip_checksum(p)
1806 self.logger.error(ppp("Unexpected or invalid packet:", p))
1812 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1814 for client in clients:
1815 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1816 IP(src=client, dst=self.nat_addr) /
1817 TCP(sport=12345, dport=external_port))
1819 self.pg1.add_stream(pkts)
1820 self.pg_enable_capture(self.pg_interfaces)
1822 capture = self.pg0.get_capture(len(pkts))
1824 if p[IP].dst == server1.ip4:
1828 self.assertTrue(server1_n > server2_n)
1830 def test_static_lb_2(self):
1831 """ NAT44 local service load balancing (asymmetrical rule) """
1832 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1835 server1 = self.pg0.remote_hosts[0]
1836 server2 = self.pg0.remote_hosts[1]
1838 locals = [{'addr': server1.ip4n,
1841 {'addr': server2.ip4n,
1845 self.vapi.nat44_forwarding_enable_disable(1)
1846 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1850 local_num=len(locals),
1852 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1853 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1856 # from client to service
1857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1859 TCP(sport=12345, dport=external_port))
1860 self.pg1.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 capture = self.pg0.get_capture(1)
1869 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1870 if ip.dst == server1.ip4:
1874 self.assertEqual(tcp.dport, local_port)
1875 self.check_tcp_checksum(p)
1876 self.check_ip_checksum(p)
1878 self.logger.error(ppp("Unexpected or invalid packet:", p))
1881 # from service back to client
1882 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1883 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1884 TCP(sport=local_port, dport=12345))
1885 self.pg0.add_stream(p)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg1.get_capture(1)
1893 self.assertEqual(ip.src, self.nat_addr)
1894 self.assertEqual(tcp.sport, external_port)
1895 self.check_tcp_checksum(p)
1896 self.check_ip_checksum(p)
1898 self.logger.error(ppp("Unexpected or invalid packet:", p))
1901 # from client to server (no translation)
1902 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1903 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1904 TCP(sport=12346, dport=local_port))
1905 self.pg1.add_stream(p)
1906 self.pg_enable_capture(self.pg_interfaces)
1908 capture = self.pg0.get_capture(1)
1914 self.assertEqual(ip.dst, server1.ip4)
1915 self.assertEqual(tcp.dport, local_port)
1916 self.check_tcp_checksum(p)
1917 self.check_ip_checksum(p)
1919 self.logger.error(ppp("Unexpected or invalid packet:", p))
1922 # from service back to client (no translation)
1923 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1924 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1925 TCP(sport=local_port, dport=12346))
1926 self.pg0.add_stream(p)
1927 self.pg_enable_capture(self.pg_interfaces)
1929 capture = self.pg1.get_capture(1)
1934 self.assertEqual(ip.src, server1.ip4)
1935 self.assertEqual(tcp.sport, local_port)
1936 self.check_tcp_checksum(p)
1937 self.check_ip_checksum(p)
1939 self.logger.error(ppp("Unexpected or invalid packet:", p))
1942 def test_multiple_inside_interfaces(self):
1943 """ NAT44 multiple non-overlapping address space inside interfaces """
1945 self.nat44_add_address(self.nat_addr)
1946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1947 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1948 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1951 # between two NAT44 inside interfaces (no translation)
1952 pkts = self.create_stream_in(self.pg0, self.pg1)
1953 self.pg0.add_stream(pkts)
1954 self.pg_enable_capture(self.pg_interfaces)
1956 capture = self.pg1.get_capture(len(pkts))
1957 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1959 # from NAT44 inside to interface without NAT44 feature (no translation)
1960 pkts = self.create_stream_in(self.pg0, self.pg2)
1961 self.pg0.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg2.get_capture(len(pkts))
1965 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1967 # in2out 1st interface
1968 pkts = self.create_stream_in(self.pg0, self.pg3)
1969 self.pg0.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg3.get_capture(len(pkts))
1973 self.verify_capture_out(capture)
1975 # out2in 1st interface
1976 pkts = self.create_stream_out(self.pg3)
1977 self.pg3.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 self.verify_capture_in(capture, self.pg0)
1983 # in2out 2nd interface
1984 pkts = self.create_stream_in(self.pg1, self.pg3)
1985 self.pg1.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg3.get_capture(len(pkts))
1989 self.verify_capture_out(capture)
1991 # out2in 2nd interface
1992 pkts = self.create_stream_out(self.pg3)
1993 self.pg3.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg1.get_capture(len(pkts))
1997 self.verify_capture_in(capture, self.pg1)
1999 def test_inside_overlapping_interfaces(self):
2000 """ NAT44 multiple inside interfaces with overlapping address space """
2002 static_nat_ip = "10.0.0.10"
2003 self.nat44_add_address(self.nat_addr)
2004 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2006 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2007 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2008 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2009 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2012 # between NAT44 inside interfaces with same VRF (no translation)
2013 pkts = self.create_stream_in(self.pg4, self.pg5)
2014 self.pg4.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg5.get_capture(len(pkts))
2018 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2020 # between NAT44 inside interfaces with different VRF (hairpinning)
2021 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2022 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2023 TCP(sport=1234, dport=5678))
2024 self.pg4.add_stream(p)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg6.get_capture(1)
2032 self.assertEqual(ip.src, self.nat_addr)
2033 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2034 self.assertNotEqual(tcp.sport, 1234)
2035 self.assertEqual(tcp.dport, 5678)
2037 self.logger.error(ppp("Unexpected or invalid packet:", p))
2040 # in2out 1st interface
2041 pkts = self.create_stream_in(self.pg4, self.pg3)
2042 self.pg4.add_stream(pkts)
2043 self.pg_enable_capture(self.pg_interfaces)
2045 capture = self.pg3.get_capture(len(pkts))
2046 self.verify_capture_out(capture)
2048 # out2in 1st interface
2049 pkts = self.create_stream_out(self.pg3)
2050 self.pg3.add_stream(pkts)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 capture = self.pg4.get_capture(len(pkts))
2054 self.verify_capture_in(capture, self.pg4)
2056 # in2out 2nd interface
2057 pkts = self.create_stream_in(self.pg5, self.pg3)
2058 self.pg5.add_stream(pkts)
2059 self.pg_enable_capture(self.pg_interfaces)
2061 capture = self.pg3.get_capture(len(pkts))
2062 self.verify_capture_out(capture)
2064 # out2in 2nd interface
2065 pkts = self.create_stream_out(self.pg3)
2066 self.pg3.add_stream(pkts)
2067 self.pg_enable_capture(self.pg_interfaces)
2069 capture = self.pg5.get_capture(len(pkts))
2070 self.verify_capture_in(capture, self.pg5)
2073 addresses = self.vapi.nat44_address_dump()
2074 self.assertEqual(len(addresses), 1)
2075 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2076 self.assertEqual(len(sessions), 3)
2077 for session in sessions:
2078 self.assertFalse(session.is_static)
2079 self.assertEqual(session.inside_ip_address[0:4],
2080 self.pg5.remote_ip4n)
2081 self.assertEqual(session.outside_ip_address,
2082 addresses[0].ip_address)
2083 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2084 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2085 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2086 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2087 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2088 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2089 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2090 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2091 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2093 # in2out 3rd interface
2094 pkts = self.create_stream_in(self.pg6, self.pg3)
2095 self.pg6.add_stream(pkts)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 capture = self.pg3.get_capture(len(pkts))
2099 self.verify_capture_out(capture, static_nat_ip, True)
2101 # out2in 3rd interface
2102 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2103 self.pg3.add_stream(pkts)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 capture = self.pg6.get_capture(len(pkts))
2107 self.verify_capture_in(capture, self.pg6)
2109 # general user and session dump verifications
2110 users = self.vapi.nat44_user_dump()
2111 self.assertTrue(len(users) >= 3)
2112 addresses = self.vapi.nat44_address_dump()
2113 self.assertEqual(len(addresses), 1)
2115 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2117 for session in sessions:
2118 self.assertEqual(user.ip_address, session.inside_ip_address)
2119 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2120 self.assertTrue(session.protocol in
2121 [IP_PROTOS.tcp, IP_PROTOS.udp,
2125 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2126 self.assertTrue(len(sessions) >= 4)
2127 for session in sessions:
2128 self.assertFalse(session.is_static)
2129 self.assertEqual(session.inside_ip_address[0:4],
2130 self.pg4.remote_ip4n)
2131 self.assertEqual(session.outside_ip_address,
2132 addresses[0].ip_address)
2135 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2136 self.assertTrue(len(sessions) >= 3)
2137 for session in sessions:
2138 self.assertTrue(session.is_static)
2139 self.assertEqual(session.inside_ip_address[0:4],
2140 self.pg6.remote_ip4n)
2141 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2142 map(int, static_nat_ip.split('.')))
2143 self.assertTrue(session.inside_port in
2144 [self.tcp_port_in, self.udp_port_in,
2147 def test_hairpinning(self):
2148 """ NAT44 hairpinning - 1:1 NAPT """
2150 host = self.pg0.remote_hosts[0]
2151 server = self.pg0.remote_hosts[1]
2154 server_in_port = 5678
2155 server_out_port = 8765
2157 self.nat44_add_address(self.nat_addr)
2158 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2159 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2161 # add static mapping for server
2162 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2163 server_in_port, server_out_port,
2164 proto=IP_PROTOS.tcp)
2166 # send packet from host to server
2167 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2168 IP(src=host.ip4, dst=self.nat_addr) /
2169 TCP(sport=host_in_port, dport=server_out_port))
2170 self.pg0.add_stream(p)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 capture = self.pg0.get_capture(1)
2178 self.assertEqual(ip.src, self.nat_addr)
2179 self.assertEqual(ip.dst, server.ip4)
2180 self.assertNotEqual(tcp.sport, host_in_port)
2181 self.assertEqual(tcp.dport, server_in_port)
2182 self.check_tcp_checksum(p)
2183 host_out_port = tcp.sport
2185 self.logger.error(ppp("Unexpected or invalid packet:", p))
2188 # send reply from server to host
2189 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2190 IP(src=server.ip4, dst=self.nat_addr) /
2191 TCP(sport=server_in_port, dport=host_out_port))
2192 self.pg0.add_stream(p)
2193 self.pg_enable_capture(self.pg_interfaces)
2195 capture = self.pg0.get_capture(1)
2200 self.assertEqual(ip.src, self.nat_addr)
2201 self.assertEqual(ip.dst, host.ip4)
2202 self.assertEqual(tcp.sport, server_out_port)
2203 self.assertEqual(tcp.dport, host_in_port)
2204 self.check_tcp_checksum(p)
2206 self.logger.error(ppp("Unexpected or invalid packet:", p))
2209 def test_hairpinning2(self):
2210 """ NAT44 hairpinning - 1:1 NAT"""
2212 server1_nat_ip = "10.0.0.10"
2213 server2_nat_ip = "10.0.0.11"
2214 host = self.pg0.remote_hosts[0]
2215 server1 = self.pg0.remote_hosts[1]
2216 server2 = self.pg0.remote_hosts[2]
2217 server_tcp_port = 22
2218 server_udp_port = 20
2220 self.nat44_add_address(self.nat_addr)
2221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2225 # add static mapping for servers
2226 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2227 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=host.ip4, dst=server1_nat_ip) /
2233 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236 IP(src=host.ip4, dst=server1_nat_ip) /
2237 UDP(sport=self.udp_port_in, dport=server_udp_port))
2239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240 IP(src=host.ip4, dst=server1_nat_ip) /
2241 ICMP(id=self.icmp_id_in, type='echo-request'))
2243 self.pg0.add_stream(pkts)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg0.get_capture(len(pkts))
2247 for packet in capture:
2249 self.assertEqual(packet[IP].src, self.nat_addr)
2250 self.assertEqual(packet[IP].dst, server1.ip4)
2251 if packet.haslayer(TCP):
2252 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2253 self.assertEqual(packet[TCP].dport, server_tcp_port)
2254 self.tcp_port_out = packet[TCP].sport
2255 self.check_tcp_checksum(packet)
2256 elif packet.haslayer(UDP):
2257 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2258 self.assertEqual(packet[UDP].dport, server_udp_port)
2259 self.udp_port_out = packet[UDP].sport
2261 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2262 self.icmp_id_out = packet[ICMP].id
2264 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=self.nat_addr) /
2271 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=server1.ip4, dst=self.nat_addr) /
2275 UDP(sport=server_udp_port, dport=self.udp_port_out))
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=server1.ip4, dst=self.nat_addr) /
2279 ICMP(id=self.icmp_id_out, type='echo-reply'))
2281 self.pg0.add_stream(pkts)
2282 self.pg_enable_capture(self.pg_interfaces)
2284 capture = self.pg0.get_capture(len(pkts))
2285 for packet in capture:
2287 self.assertEqual(packet[IP].src, server1_nat_ip)
2288 self.assertEqual(packet[IP].dst, host.ip4)
2289 if packet.haslayer(TCP):
2290 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2291 self.assertEqual(packet[TCP].sport, server_tcp_port)
2292 self.check_tcp_checksum(packet)
2293 elif packet.haslayer(UDP):
2294 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2295 self.assertEqual(packet[UDP].sport, server_udp_port)
2297 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2299 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2302 # server2 to server1
2304 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2305 IP(src=server2.ip4, dst=server1_nat_ip) /
2306 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2308 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309 IP(src=server2.ip4, dst=server1_nat_ip) /
2310 UDP(sport=self.udp_port_in, dport=server_udp_port))
2312 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313 IP(src=server2.ip4, dst=server1_nat_ip) /
2314 ICMP(id=self.icmp_id_in, type='echo-request'))
2316 self.pg0.add_stream(pkts)
2317 self.pg_enable_capture(self.pg_interfaces)
2319 capture = self.pg0.get_capture(len(pkts))
2320 for packet in capture:
2322 self.assertEqual(packet[IP].src, server2_nat_ip)
2323 self.assertEqual(packet[IP].dst, server1.ip4)
2324 if packet.haslayer(TCP):
2325 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2326 self.assertEqual(packet[TCP].dport, server_tcp_port)
2327 self.tcp_port_out = packet[TCP].sport
2328 self.check_tcp_checksum(packet)
2329 elif packet.haslayer(UDP):
2330 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2331 self.assertEqual(packet[UDP].dport, server_udp_port)
2332 self.udp_port_out = packet[UDP].sport
2334 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2335 self.icmp_id_out = packet[ICMP].id
2337 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2340 # server1 to server2
2342 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2343 IP(src=server1.ip4, dst=server2_nat_ip) /
2344 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2347 IP(src=server1.ip4, dst=server2_nat_ip) /
2348 UDP(sport=server_udp_port, dport=self.udp_port_out))
2350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2351 IP(src=server1.ip4, dst=server2_nat_ip) /
2352 ICMP(id=self.icmp_id_out, type='echo-reply'))
2354 self.pg0.add_stream(pkts)
2355 self.pg_enable_capture(self.pg_interfaces)
2357 capture = self.pg0.get_capture(len(pkts))
2358 for packet in capture:
2360 self.assertEqual(packet[IP].src, server1_nat_ip)
2361 self.assertEqual(packet[IP].dst, server2.ip4)
2362 if packet.haslayer(TCP):
2363 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2364 self.assertEqual(packet[TCP].sport, server_tcp_port)
2365 self.check_tcp_checksum(packet)
2366 elif packet.haslayer(UDP):
2367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2368 self.assertEqual(packet[UDP].sport, server_udp_port)
2370 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2372 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2375 def test_max_translations_per_user(self):
2376 """ MAX translations per user - recycle the least recently used """
2378 self.nat44_add_address(self.nat_addr)
2379 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2380 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2383 # get maximum number of translations per user
2384 nat44_config = self.vapi.nat_show_config()
2386 # send more than maximum number of translations per user packets
2387 pkts_num = nat44_config.max_translations_per_user + 5
2389 for port in range(0, pkts_num):
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392 TCP(sport=1025 + port))
2394 self.pg0.add_stream(pkts)
2395 self.pg_enable_capture(self.pg_interfaces)
2398 # verify number of translated packet
2399 self.pg1.get_capture(pkts_num)
2401 def test_interface_addr(self):
2402 """ Acquire NAT44 addresses from interface """
2403 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2405 # no address in NAT pool
2406 adresses = self.vapi.nat44_address_dump()
2407 self.assertEqual(0, len(adresses))
2409 # configure interface address and check NAT address pool
2410 self.pg7.config_ip4()
2411 adresses = self.vapi.nat44_address_dump()
2412 self.assertEqual(1, len(adresses))
2413 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2415 # remove interface address and check NAT address pool
2416 self.pg7.unconfig_ip4()
2417 adresses = self.vapi.nat44_address_dump()
2418 self.assertEqual(0, len(adresses))
2420 def test_interface_addr_static_mapping(self):
2421 """ Static mapping with addresses from interface """
2424 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425 self.nat44_add_static_mapping(
2427 external_sw_if_index=self.pg7.sw_if_index,
2430 # static mappings with external interface
2431 static_mappings = self.vapi.nat44_static_mapping_dump()
2432 self.assertEqual(1, len(static_mappings))
2433 self.assertEqual(self.pg7.sw_if_index,
2434 static_mappings[0].external_sw_if_index)
2435 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2437 # configure interface address and check static mappings
2438 self.pg7.config_ip4()
2439 static_mappings = self.vapi.nat44_static_mapping_dump()
2440 self.assertEqual(1, len(static_mappings))
2441 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2442 self.pg7.local_ip4n)
2443 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2444 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2446 # remove interface address and check static mappings
2447 self.pg7.unconfig_ip4()
2448 static_mappings = self.vapi.nat44_static_mapping_dump()
2449 self.assertEqual(0, len(static_mappings))
2451 def test_interface_addr_identity_nat(self):
2452 """ Identity NAT with addresses from interface """
2455 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2456 self.vapi.nat44_add_del_identity_mapping(
2457 sw_if_index=self.pg7.sw_if_index,
2459 protocol=IP_PROTOS.tcp,
2462 # identity mappings with external interface
2463 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2464 self.assertEqual(1, len(identity_mappings))
2465 self.assertEqual(self.pg7.sw_if_index,
2466 identity_mappings[0].sw_if_index)
2468 # configure interface address and check identity mappings
2469 self.pg7.config_ip4()
2470 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2471 self.assertEqual(1, len(identity_mappings))
2472 self.assertEqual(identity_mappings[0].ip_address,
2473 self.pg7.local_ip4n)
2474 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2475 self.assertEqual(port, identity_mappings[0].port)
2476 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2478 # remove interface address and check identity mappings
2479 self.pg7.unconfig_ip4()
2480 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2481 self.assertEqual(0, len(identity_mappings))
2483 def test_ipfix_nat44_sess(self):
2484 """ IPFIX logging NAT44 session created/delted """
2485 self.ipfix_domain_id = 10
2486 self.ipfix_src_port = 20202
2487 colector_port = 30303
2488 bind_layers(UDP, IPFIX, dport=30303)
2489 self.nat44_add_address(self.nat_addr)
2490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2493 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2494 src_address=self.pg3.local_ip4n,
2496 template_interval=10,
2497 collector_port=colector_port)
2498 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2499 src_port=self.ipfix_src_port)
2501 pkts = self.create_stream_in(self.pg0, self.pg1)
2502 self.pg0.add_stream(pkts)
2503 self.pg_enable_capture(self.pg_interfaces)
2505 capture = self.pg1.get_capture(len(pkts))
2506 self.verify_capture_out(capture)
2507 self.nat44_add_address(self.nat_addr, is_add=0)
2508 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2509 capture = self.pg3.get_capture(9)
2510 ipfix = IPFIXDecoder()
2511 # first load template
2513 self.assertTrue(p.haslayer(IPFIX))
2514 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2515 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2516 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2517 self.assertEqual(p[UDP].dport, colector_port)
2518 self.assertEqual(p[IPFIX].observationDomainID,
2519 self.ipfix_domain_id)
2520 if p.haslayer(Template):
2521 ipfix.add_template(p.getlayer(Template))
2522 # verify events in data set
2524 if p.haslayer(Data):
2525 data = ipfix.decode_data_set(p.getlayer(Set))
2526 self.verify_ipfix_nat44_ses(data)
2528 def test_ipfix_addr_exhausted(self):
2529 """ IPFIX logging NAT addresses exhausted """
2530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2533 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2534 src_address=self.pg3.local_ip4n,
2536 template_interval=10)
2537 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2538 src_port=self.ipfix_src_port)
2540 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2541 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2543 self.pg0.add_stream(p)
2544 self.pg_enable_capture(self.pg_interfaces)
2546 capture = self.pg1.get_capture(0)
2547 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2548 capture = self.pg3.get_capture(9)
2549 ipfix = IPFIXDecoder()
2550 # first load template
2552 self.assertTrue(p.haslayer(IPFIX))
2553 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2554 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2555 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2556 self.assertEqual(p[UDP].dport, 4739)
2557 self.assertEqual(p[IPFIX].observationDomainID,
2558 self.ipfix_domain_id)
2559 if p.haslayer(Template):
2560 ipfix.add_template(p.getlayer(Template))
2561 # verify events in data set
2563 if p.haslayer(Data):
2564 data = ipfix.decode_data_set(p.getlayer(Set))
2565 self.verify_ipfix_addr_exhausted(data)
2567 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2568 def test_ipfix_max_sessions(self):
2569 """ IPFIX logging maximum session entries exceeded """
2570 self.nat44_add_address(self.nat_addr)
2571 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2572 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2575 nat44_config = self.vapi.nat_show_config()
2576 max_sessions = 10 * nat44_config.translation_buckets
2579 for i in range(0, max_sessions):
2580 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2581 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2582 IP(src=src, dst=self.pg1.remote_ip4) /
2585 self.pg0.add_stream(pkts)
2586 self.pg_enable_capture(self.pg_interfaces)
2589 self.pg1.get_capture(max_sessions)
2590 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2591 src_address=self.pg3.local_ip4n,
2593 template_interval=10)
2594 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2595 src_port=self.ipfix_src_port)
2597 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2598 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2600 self.pg0.add_stream(p)
2601 self.pg_enable_capture(self.pg_interfaces)
2603 self.pg1.get_capture(0)
2604 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2605 capture = self.pg3.get_capture(9)
2606 ipfix = IPFIXDecoder()
2607 # first load template
2609 self.assertTrue(p.haslayer(IPFIX))
2610 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2611 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2612 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2613 self.assertEqual(p[UDP].dport, 4739)
2614 self.assertEqual(p[IPFIX].observationDomainID,
2615 self.ipfix_domain_id)
2616 if p.haslayer(Template):
2617 ipfix.add_template(p.getlayer(Template))
2618 # verify events in data set
2620 if p.haslayer(Data):
2621 data = ipfix.decode_data_set(p.getlayer(Set))
2622 self.verify_ipfix_max_sessions(data, max_sessions)
2624 def test_pool_addr_fib(self):
2625 """ NAT44 add pool addresses to FIB """
2626 static_addr = '10.0.0.10'
2627 self.nat44_add_address(self.nat_addr)
2628 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2629 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2631 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2634 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2635 ARP(op=ARP.who_has, pdst=self.nat_addr,
2636 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2637 self.pg1.add_stream(p)
2638 self.pg_enable_capture(self.pg_interfaces)
2640 capture = self.pg1.get_capture(1)
2641 self.assertTrue(capture[0].haslayer(ARP))
2642 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2645 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2646 ARP(op=ARP.who_has, pdst=static_addr,
2647 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2648 self.pg1.add_stream(p)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 capture = self.pg1.get_capture(1)
2652 self.assertTrue(capture[0].haslayer(ARP))
2653 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2655 # send ARP to non-NAT44 interface
2656 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2657 ARP(op=ARP.who_has, pdst=self.nat_addr,
2658 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2659 self.pg2.add_stream(p)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 capture = self.pg1.get_capture(0)
2664 # remove addresses and verify
2665 self.nat44_add_address(self.nat_addr, is_add=0)
2666 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2669 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2670 ARP(op=ARP.who_has, pdst=self.nat_addr,
2671 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2672 self.pg1.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 capture = self.pg1.get_capture(0)
2677 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2678 ARP(op=ARP.who_has, pdst=static_addr,
2679 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2680 self.pg1.add_stream(p)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture = self.pg1.get_capture(0)
2685 def test_vrf_mode(self):
2686 """ NAT44 tenant VRF aware address pool mode """
2690 nat_ip1 = "10.0.0.10"
2691 nat_ip2 = "10.0.0.11"
2693 self.pg0.unconfig_ip4()
2694 self.pg1.unconfig_ip4()
2695 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2696 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2697 self.pg0.set_table_ip4(vrf_id1)
2698 self.pg1.set_table_ip4(vrf_id2)
2699 self.pg0.config_ip4()
2700 self.pg1.config_ip4()
2702 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2703 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2704 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2705 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2706 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2710 pkts = self.create_stream_in(self.pg0, self.pg2)
2711 self.pg0.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg2.get_capture(len(pkts))
2715 self.verify_capture_out(capture, nat_ip1)
2718 pkts = self.create_stream_in(self.pg1, self.pg2)
2719 self.pg1.add_stream(pkts)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg2.get_capture(len(pkts))
2723 self.verify_capture_out(capture, nat_ip2)
2725 self.pg0.unconfig_ip4()
2726 self.pg1.unconfig_ip4()
2727 self.pg0.set_table_ip4(0)
2728 self.pg1.set_table_ip4(0)
2729 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2730 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2732 def test_vrf_feature_independent(self):
2733 """ NAT44 tenant VRF independent address pool mode """
2735 nat_ip1 = "10.0.0.10"
2736 nat_ip2 = "10.0.0.11"
2738 self.nat44_add_address(nat_ip1)
2739 self.nat44_add_address(nat_ip2, vrf_id=99)
2740 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2741 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2742 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2746 pkts = self.create_stream_in(self.pg0, self.pg2)
2747 self.pg0.add_stream(pkts)
2748 self.pg_enable_capture(self.pg_interfaces)
2750 capture = self.pg2.get_capture(len(pkts))
2751 self.verify_capture_out(capture, nat_ip1)
2754 pkts = self.create_stream_in(self.pg1, self.pg2)
2755 self.pg1.add_stream(pkts)
2756 self.pg_enable_capture(self.pg_interfaces)
2758 capture = self.pg2.get_capture(len(pkts))
2759 self.verify_capture_out(capture, nat_ip1)
2761 def test_dynamic_ipless_interfaces(self):
2762 """ NAT44 interfaces without configured IP address """
2764 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2765 mactobinary(self.pg7.remote_mac),
2766 self.pg7.remote_ip4n,
2768 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2769 mactobinary(self.pg8.remote_mac),
2770 self.pg8.remote_ip4n,
2773 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2774 dst_address_length=32,
2775 next_hop_address=self.pg7.remote_ip4n,
2776 next_hop_sw_if_index=self.pg7.sw_if_index)
2777 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2778 dst_address_length=32,
2779 next_hop_address=self.pg8.remote_ip4n,
2780 next_hop_sw_if_index=self.pg8.sw_if_index)
2782 self.nat44_add_address(self.nat_addr)
2783 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2784 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2788 pkts = self.create_stream_in(self.pg7, self.pg8)
2789 self.pg7.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg8.get_capture(len(pkts))
2793 self.verify_capture_out(capture)
2796 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2797 self.pg8.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg7.get_capture(len(pkts))
2801 self.verify_capture_in(capture, self.pg7)
2803 def test_static_ipless_interfaces(self):
2804 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2806 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2807 mactobinary(self.pg7.remote_mac),
2808 self.pg7.remote_ip4n,
2810 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2811 mactobinary(self.pg8.remote_mac),
2812 self.pg8.remote_ip4n,
2815 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2816 dst_address_length=32,
2817 next_hop_address=self.pg7.remote_ip4n,
2818 next_hop_sw_if_index=self.pg7.sw_if_index)
2819 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2820 dst_address_length=32,
2821 next_hop_address=self.pg8.remote_ip4n,
2822 next_hop_sw_if_index=self.pg8.sw_if_index)
2824 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2825 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2826 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2830 pkts = self.create_stream_out(self.pg8)
2831 self.pg8.add_stream(pkts)
2832 self.pg_enable_capture(self.pg_interfaces)
2834 capture = self.pg7.get_capture(len(pkts))
2835 self.verify_capture_in(capture, self.pg7)
2838 pkts = self.create_stream_in(self.pg7, self.pg8)
2839 self.pg7.add_stream(pkts)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 capture = self.pg8.get_capture(len(pkts))
2843 self.verify_capture_out(capture, self.nat_addr, True)
2845 def test_static_with_port_ipless_interfaces(self):
2846 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2848 self.tcp_port_out = 30606
2849 self.udp_port_out = 30607
2850 self.icmp_id_out = 30608
2852 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2853 mactobinary(self.pg7.remote_mac),
2854 self.pg7.remote_ip4n,
2856 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2857 mactobinary(self.pg8.remote_mac),
2858 self.pg8.remote_ip4n,
2861 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2862 dst_address_length=32,
2863 next_hop_address=self.pg7.remote_ip4n,
2864 next_hop_sw_if_index=self.pg7.sw_if_index)
2865 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2866 dst_address_length=32,
2867 next_hop_address=self.pg8.remote_ip4n,
2868 next_hop_sw_if_index=self.pg8.sw_if_index)
2870 self.nat44_add_address(self.nat_addr)
2871 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2872 self.tcp_port_in, self.tcp_port_out,
2873 proto=IP_PROTOS.tcp)
2874 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2875 self.udp_port_in, self.udp_port_out,
2876 proto=IP_PROTOS.udp)
2877 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2878 self.icmp_id_in, self.icmp_id_out,
2879 proto=IP_PROTOS.icmp)
2880 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2881 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2885 pkts = self.create_stream_out(self.pg8)
2886 self.pg8.add_stream(pkts)
2887 self.pg_enable_capture(self.pg_interfaces)
2889 capture = self.pg7.get_capture(len(pkts))
2890 self.verify_capture_in(capture, self.pg7)
2893 pkts = self.create_stream_in(self.pg7, self.pg8)
2894 self.pg7.add_stream(pkts)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg8.get_capture(len(pkts))
2898 self.verify_capture_out(capture)
2900 def test_static_unknown_proto(self):
2901 """ 1:1 NAT translate packet with unknown protocol """
2902 nat_ip = "10.0.0.10"
2903 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2904 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2905 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2912 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2913 TCP(sport=1234, dport=1234))
2914 self.pg0.add_stream(p)
2915 self.pg_enable_capture(self.pg_interfaces)
2917 p = self.pg1.get_capture(1)
2920 self.assertEqual(packet[IP].src, nat_ip)
2921 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2922 self.assertTrue(packet.haslayer(GRE))
2923 self.check_ip_checksum(packet)
2925 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2929 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2930 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2932 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2933 TCP(sport=1234, dport=1234))
2934 self.pg1.add_stream(p)
2935 self.pg_enable_capture(self.pg_interfaces)
2937 p = self.pg0.get_capture(1)
2940 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2941 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2942 self.assertTrue(packet.haslayer(GRE))
2943 self.check_ip_checksum(packet)
2945 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2948 def test_hairpinning_static_unknown_proto(self):
2949 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2951 host = self.pg0.remote_hosts[0]
2952 server = self.pg0.remote_hosts[1]
2954 host_nat_ip = "10.0.0.10"
2955 server_nat_ip = "10.0.0.11"
2957 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2958 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2964 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2965 IP(src=host.ip4, dst=server_nat_ip) /
2967 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2968 TCP(sport=1234, dport=1234))
2969 self.pg0.add_stream(p)
2970 self.pg_enable_capture(self.pg_interfaces)
2972 p = self.pg0.get_capture(1)
2975 self.assertEqual(packet[IP].src, host_nat_ip)
2976 self.assertEqual(packet[IP].dst, server.ip4)
2977 self.assertTrue(packet.haslayer(GRE))
2978 self.check_ip_checksum(packet)
2980 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2984 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2985 IP(src=server.ip4, dst=host_nat_ip) /
2987 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2988 TCP(sport=1234, dport=1234))
2989 self.pg0.add_stream(p)
2990 self.pg_enable_capture(self.pg_interfaces)
2992 p = self.pg0.get_capture(1)
2995 self.assertEqual(packet[IP].src, server_nat_ip)
2996 self.assertEqual(packet[IP].dst, host.ip4)
2997 self.assertTrue(packet.haslayer(GRE))
2998 self.check_ip_checksum(packet)
3000 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3003 def test_unknown_proto(self):
3004 """ NAT44 translate packet with unknown protocol """
3005 self.nat44_add_address(self.nat_addr)
3006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3007 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3011 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3012 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3013 TCP(sport=self.tcp_port_in, dport=20))
3014 self.pg0.add_stream(p)
3015 self.pg_enable_capture(self.pg_interfaces)
3017 p = self.pg1.get_capture(1)
3019 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3022 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg1.get_capture(1)
3030 self.assertEqual(packet[IP].src, self.nat_addr)
3031 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3039 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3040 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3042 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3043 TCP(sport=1234, dport=1234))
3044 self.pg1.add_stream(p)
3045 self.pg_enable_capture(self.pg_interfaces)
3047 p = self.pg0.get_capture(1)
3050 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3051 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3052 self.assertTrue(packet.haslayer(GRE))
3053 self.check_ip_checksum(packet)
3055 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3058 def test_hairpinning_unknown_proto(self):
3059 """ NAT44 translate packet with unknown protocol - hairpinning """
3060 host = self.pg0.remote_hosts[0]
3061 server = self.pg0.remote_hosts[1]
3064 server_in_port = 5678
3065 server_out_port = 8765
3066 server_nat_ip = "10.0.0.11"
3068 self.nat44_add_address(self.nat_addr)
3069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3073 # add static mapping for server
3074 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3077 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3078 IP(src=host.ip4, dst=server_nat_ip) /
3079 TCP(sport=host_in_port, dport=server_out_port))
3080 self.pg0.add_stream(p)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg0.get_capture(1)
3085 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3086 IP(src=host.ip4, dst=server_nat_ip) /
3088 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3089 TCP(sport=1234, dport=1234))
3090 self.pg0.add_stream(p)
3091 self.pg_enable_capture(self.pg_interfaces)
3093 p = self.pg0.get_capture(1)
3096 self.assertEqual(packet[IP].src, self.nat_addr)
3097 self.assertEqual(packet[IP].dst, server.ip4)
3098 self.assertTrue(packet.haslayer(GRE))
3099 self.check_ip_checksum(packet)
3101 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3105 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3106 IP(src=server.ip4, dst=self.nat_addr) /
3108 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3109 TCP(sport=1234, dport=1234))
3110 self.pg0.add_stream(p)
3111 self.pg_enable_capture(self.pg_interfaces)
3113 p = self.pg0.get_capture(1)
3116 self.assertEqual(packet[IP].src, server_nat_ip)
3117 self.assertEqual(packet[IP].dst, host.ip4)
3118 self.assertTrue(packet.haslayer(GRE))
3119 self.check_ip_checksum(packet)
3121 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3124 def test_output_feature(self):
3125 """ NAT44 interface output feature (in2out postrouting) """
3126 self.nat44_add_address(self.nat_addr)
3127 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3128 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3129 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3133 pkts = self.create_stream_in(self.pg0, self.pg3)
3134 self.pg0.add_stream(pkts)
3135 self.pg_enable_capture(self.pg_interfaces)
3137 capture = self.pg3.get_capture(len(pkts))
3138 self.verify_capture_out(capture)
3141 pkts = self.create_stream_out(self.pg3)
3142 self.pg3.add_stream(pkts)
3143 self.pg_enable_capture(self.pg_interfaces)
3145 capture = self.pg0.get_capture(len(pkts))
3146 self.verify_capture_in(capture, self.pg0)
3148 # from non-NAT interface to NAT inside interface
3149 pkts = self.create_stream_in(self.pg2, self.pg0)
3150 self.pg2.add_stream(pkts)
3151 self.pg_enable_capture(self.pg_interfaces)
3153 capture = self.pg0.get_capture(len(pkts))
3154 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3156 def test_output_feature_vrf_aware(self):
3157 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3158 nat_ip_vrf10 = "10.0.0.10"
3159 nat_ip_vrf20 = "10.0.0.20"
3161 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3162 dst_address_length=32,
3163 next_hop_address=self.pg3.remote_ip4n,
3164 next_hop_sw_if_index=self.pg3.sw_if_index,
3166 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3167 dst_address_length=32,
3168 next_hop_address=self.pg3.remote_ip4n,
3169 next_hop_sw_if_index=self.pg3.sw_if_index,
3172 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3173 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3174 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3175 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3176 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3180 pkts = self.create_stream_in(self.pg4, self.pg3)
3181 self.pg4.add_stream(pkts)
3182 self.pg_enable_capture(self.pg_interfaces)
3184 capture = self.pg3.get_capture(len(pkts))
3185 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3188 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3189 self.pg3.add_stream(pkts)
3190 self.pg_enable_capture(self.pg_interfaces)
3192 capture = self.pg4.get_capture(len(pkts))
3193 self.verify_capture_in(capture, self.pg4)
3196 pkts = self.create_stream_in(self.pg6, self.pg3)
3197 self.pg6.add_stream(pkts)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg3.get_capture(len(pkts))
3201 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3204 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3205 self.pg3.add_stream(pkts)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg6.get_capture(len(pkts))
3209 self.verify_capture_in(capture, self.pg6)
3211 def test_output_feature_hairpinning(self):
3212 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3213 host = self.pg0.remote_hosts[0]
3214 server = self.pg0.remote_hosts[1]
3217 server_in_port = 5678
3218 server_out_port = 8765
3220 self.nat44_add_address(self.nat_addr)
3221 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3222 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3225 # add static mapping for server
3226 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3227 server_in_port, server_out_port,
3228 proto=IP_PROTOS.tcp)
3230 # send packet from host to server
3231 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3232 IP(src=host.ip4, dst=self.nat_addr) /
3233 TCP(sport=host_in_port, dport=server_out_port))
3234 self.pg0.add_stream(p)
3235 self.pg_enable_capture(self.pg_interfaces)
3237 capture = self.pg0.get_capture(1)
3242 self.assertEqual(ip.src, self.nat_addr)
3243 self.assertEqual(ip.dst, server.ip4)
3244 self.assertNotEqual(tcp.sport, host_in_port)
3245 self.assertEqual(tcp.dport, server_in_port)
3246 self.check_tcp_checksum(p)
3247 host_out_port = tcp.sport
3249 self.logger.error(ppp("Unexpected or invalid packet:", p))
3252 # send reply from server to host
3253 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3254 IP(src=server.ip4, dst=self.nat_addr) /
3255 TCP(sport=server_in_port, dport=host_out_port))
3256 self.pg0.add_stream(p)
3257 self.pg_enable_capture(self.pg_interfaces)
3259 capture = self.pg0.get_capture(1)
3264 self.assertEqual(ip.src, self.nat_addr)
3265 self.assertEqual(ip.dst, host.ip4)
3266 self.assertEqual(tcp.sport, server_out_port)
3267 self.assertEqual(tcp.dport, host_in_port)
3268 self.check_tcp_checksum(p)
3270 self.logger.error(ppp("Unexpected or invalid packet:", p))
3273 def test_output_feature_and_service(self):
3274 """ NAT44 interface output feature and services """
3275 external_addr = '1.2.3.4'
3279 self.vapi.nat44_forwarding_enable_disable(1)
3280 self.nat44_add_address(self.nat_addr)
3281 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3282 local_port, external_port,
3283 proto=IP_PROTOS.tcp, out2in_only=1)
3284 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3285 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3288 # from client to service
3289 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3290 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3291 TCP(sport=12345, dport=external_port))
3292 self.pg1.add_stream(p)
3293 self.pg_enable_capture(self.pg_interfaces)
3295 capture = self.pg0.get_capture(1)
3301 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3302 self.assertEqual(tcp.dport, local_port)
3303 self.check_tcp_checksum(p)
3304 self.check_ip_checksum(p)
3306 self.logger.error(ppp("Unexpected or invalid packet:", p))
3309 # from service back to client
3310 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3311 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3312 TCP(sport=local_port, dport=12345))
3313 self.pg0.add_stream(p)
3314 self.pg_enable_capture(self.pg_interfaces)
3316 capture = self.pg1.get_capture(1)
3321 self.assertEqual(ip.src, external_addr)
3322 self.assertEqual(tcp.sport, external_port)
3323 self.check_tcp_checksum(p)
3324 self.check_ip_checksum(p)
3326 self.logger.error(ppp("Unexpected or invalid packet:", p))
3329 # from local network host to external network
3331 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3332 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3333 TCP(sport=23456, dport=34567))
3334 self.pg0.add_stream(p)
3335 self.pg_enable_capture(self.pg_interfaces)
3337 capture = self.pg1.get_capture(1)
3342 self.assertEqual(ip.src, self.nat_addr)
3343 self.assertNotEqual(tcp.sport, 23456)
3344 ext_port = tcp.sport
3345 self.check_tcp_checksum(p)
3346 self.check_ip_checksum(p)
3348 self.logger.error(ppp("Unexpected or invalid packet:", p))
3351 # from external network back to local network host
3352 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3353 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3354 TCP(sport=34567, dport=ext_port))
3355 self.pg1.add_stream(p)
3356 self.pg_enable_capture(self.pg_interfaces)
3358 capture = self.pg0.get_capture(1)
3364 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3365 self.assertEqual(tcp.dport, 23456)
3366 self.check_tcp_checksum(p)
3367 self.check_ip_checksum(p)
3369 self.logger.error(ppp("Unexpected or invalid packet:", p))
3372 def test_output_feature_and_service2(self):
3373 """ NAT44 interface output feature and service host direct access """
3374 self.vapi.nat44_forwarding_enable_disable(1)
3375 self.nat44_add_address(self.nat_addr)
3376 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3379 # session initiaded from service host - translate
3380 pkts = self.create_stream_in(self.pg0, self.pg1)
3381 self.pg0.add_stream(pkts)
3382 self.pg_enable_capture(self.pg_interfaces)
3384 capture = self.pg1.get_capture(len(pkts))
3385 self.verify_capture_out(capture)
3387 pkts = self.create_stream_out(self.pg1)
3388 self.pg1.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 capture = self.pg0.get_capture(len(pkts))
3392 self.verify_capture_in(capture, self.pg0)
3394 tcp_port_out = self.tcp_port_out
3395 udp_port_out = self.udp_port_out
3396 icmp_id_out = self.icmp_id_out
3398 # session initiaded from remote host - do not translate
3399 pkts = self.create_stream_out(self.pg1,
3400 self.pg0.remote_ip4,
3401 use_inside_ports=True)
3402 self.pg1.add_stream(pkts)
3403 self.pg_enable_capture(self.pg_interfaces)
3405 capture = self.pg0.get_capture(len(pkts))
3406 self.verify_capture_in(capture, self.pg0)
3408 pkts = self.create_stream_in(self.pg0, self.pg1)
3409 self.pg0.add_stream(pkts)
3410 self.pg_enable_capture(self.pg_interfaces)
3412 capture = self.pg1.get_capture(len(pkts))
3413 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3416 def test_one_armed_nat44(self):
3417 """ One armed NAT44 """
3418 remote_host = self.pg9.remote_hosts[0]
3419 local_host = self.pg9.remote_hosts[1]
3422 self.nat44_add_address(self.nat_addr)
3423 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3424 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3428 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3429 IP(src=local_host.ip4, dst=remote_host.ip4) /
3430 TCP(sport=12345, dport=80))
3431 self.pg9.add_stream(p)
3432 self.pg_enable_capture(self.pg_interfaces)
3434 capture = self.pg9.get_capture(1)
3439 self.assertEqual(ip.src, self.nat_addr)
3440 self.assertEqual(ip.dst, remote_host.ip4)
3441 self.assertNotEqual(tcp.sport, 12345)
3442 external_port = tcp.sport
3443 self.assertEqual(tcp.dport, 80)
3444 self.check_tcp_checksum(p)
3445 self.check_ip_checksum(p)
3447 self.logger.error(ppp("Unexpected or invalid packet:", p))
3451 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3452 IP(src=remote_host.ip4, dst=self.nat_addr) /
3453 TCP(sport=80, dport=external_port))
3454 self.pg9.add_stream(p)
3455 self.pg_enable_capture(self.pg_interfaces)
3457 capture = self.pg9.get_capture(1)
3462 self.assertEqual(ip.src, remote_host.ip4)
3463 self.assertEqual(ip.dst, local_host.ip4)
3464 self.assertEqual(tcp.sport, 80)
3465 self.assertEqual(tcp.dport, 12345)
3466 self.check_tcp_checksum(p)
3467 self.check_ip_checksum(p)
3469 self.logger.error(ppp("Unexpected or invalid packet:", p))
3472 def test_one_armed_nat44_static(self):
3473 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3474 remote_host = self.pg9.remote_hosts[0]
3475 local_host = self.pg9.remote_hosts[1]
3480 self.vapi.nat44_forwarding_enable_disable(1)
3481 self.nat44_add_address(self.nat_addr, twice_nat=1)
3482 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3483 local_port, external_port,
3484 proto=IP_PROTOS.tcp, out2in_only=1,
3486 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3487 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3490 # from client to service
3491 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3492 IP(src=remote_host.ip4, dst=self.nat_addr) /
3493 TCP(sport=12345, dport=external_port))
3494 self.pg9.add_stream(p)
3495 self.pg_enable_capture(self.pg_interfaces)
3497 capture = self.pg9.get_capture(1)
3503 self.assertEqual(ip.dst, local_host.ip4)
3504 self.assertEqual(ip.src, self.nat_addr)
3505 self.assertEqual(tcp.dport, local_port)
3506 self.assertNotEqual(tcp.sport, 12345)
3507 eh_port_in = tcp.sport
3508 self.check_tcp_checksum(p)
3509 self.check_ip_checksum(p)
3511 self.logger.error(ppp("Unexpected or invalid packet:", p))
3514 # from service back to client
3515 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3516 IP(src=local_host.ip4, dst=self.nat_addr) /
3517 TCP(sport=local_port, dport=eh_port_in))
3518 self.pg9.add_stream(p)
3519 self.pg_enable_capture(self.pg_interfaces)
3521 capture = self.pg9.get_capture(1)
3526 self.assertEqual(ip.src, self.nat_addr)
3527 self.assertEqual(ip.dst, remote_host.ip4)
3528 self.assertEqual(tcp.sport, external_port)
3529 self.assertEqual(tcp.dport, 12345)
3530 self.check_tcp_checksum(p)
3531 self.check_ip_checksum(p)
3533 self.logger.error(ppp("Unexpected or invalid packet:", p))
3536 def test_del_session(self):
3537 """ Delete NAT44 session """
3538 self.nat44_add_address(self.nat_addr)
3539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3540 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3543 pkts = self.create_stream_in(self.pg0, self.pg1)
3544 self.pg0.add_stream(pkts)
3545 self.pg_enable_capture(self.pg_interfaces)
3547 capture = self.pg1.get_capture(len(pkts))
3549 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3550 nsessions = len(sessions)
3552 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3553 sessions[0].inside_port,
3554 sessions[0].protocol)
3555 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3556 sessions[1].outside_port,
3557 sessions[1].protocol,
3560 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3561 self.assertEqual(nsessions - len(sessions), 2)
3563 def test_set_get_reass(self):
3564 """ NAT44 set/get virtual fragmentation reassembly """
3565 reas_cfg1 = self.vapi.nat_get_reass()
3567 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3568 max_reass=reas_cfg1.ip4_max_reass * 2,
3569 max_frag=reas_cfg1.ip4_max_frag * 2)
3571 reas_cfg2 = self.vapi.nat_get_reass()
3573 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3574 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3575 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3577 self.vapi.nat_set_reass(drop_frag=1)
3578 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3580 def test_frag_in_order(self):
3581 """ NAT44 translate fragments arriving in order """
3582 self.nat44_add_address(self.nat_addr)
3583 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3584 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3587 data = "A" * 4 + "B" * 16 + "C" * 3
3588 self.tcp_port_in = random.randint(1025, 65535)
3590 reass = self.vapi.nat_reass_dump()
3591 reass_n_start = len(reass)
3594 pkts = self.create_stream_frag(self.pg0,
3595 self.pg1.remote_ip4,
3599 self.pg0.add_stream(pkts)
3600 self.pg_enable_capture(self.pg_interfaces)
3602 frags = self.pg1.get_capture(len(pkts))
3603 p = self.reass_frags_and_verify(frags,
3605 self.pg1.remote_ip4)
3606 self.assertEqual(p[TCP].dport, 20)
3607 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3608 self.tcp_port_out = p[TCP].sport
3609 self.assertEqual(data, p[Raw].load)
3612 pkts = self.create_stream_frag(self.pg1,
3617 self.pg1.add_stream(pkts)
3618 self.pg_enable_capture(self.pg_interfaces)
3620 frags = self.pg0.get_capture(len(pkts))
3621 p = self.reass_frags_and_verify(frags,
3622 self.pg1.remote_ip4,
3623 self.pg0.remote_ip4)
3624 self.assertEqual(p[TCP].sport, 20)
3625 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3626 self.assertEqual(data, p[Raw].load)
3628 reass = self.vapi.nat_reass_dump()
3629 reass_n_end = len(reass)
3631 self.assertEqual(reass_n_end - reass_n_start, 2)
3633 def test_reass_hairpinning(self):
3634 """ NAT44 fragments hairpinning """
3635 host = self.pg0.remote_hosts[0]
3636 server = self.pg0.remote_hosts[1]
3637 host_in_port = random.randint(1025, 65535)
3639 server_in_port = random.randint(1025, 65535)
3640 server_out_port = random.randint(1025, 65535)
3641 data = "A" * 4 + "B" * 16 + "C" * 3
3643 self.nat44_add_address(self.nat_addr)
3644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3647 # add static mapping for server
3648 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3649 server_in_port, server_out_port,
3650 proto=IP_PROTOS.tcp)
3652 # send packet from host to server
3653 pkts = self.create_stream_frag(self.pg0,
3658 self.pg0.add_stream(pkts)
3659 self.pg_enable_capture(self.pg_interfaces)
3661 frags = self.pg0.get_capture(len(pkts))
3662 p = self.reass_frags_and_verify(frags,
3665 self.assertNotEqual(p[TCP].sport, host_in_port)
3666 self.assertEqual(p[TCP].dport, server_in_port)
3667 self.assertEqual(data, p[Raw].load)
3669 def test_frag_out_of_order(self):
3670 """ NAT44 translate fragments arriving out of order """
3671 self.nat44_add_address(self.nat_addr)
3672 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3673 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3676 data = "A" * 4 + "B" * 16 + "C" * 3
3677 random.randint(1025, 65535)
3680 pkts = self.create_stream_frag(self.pg0,
3681 self.pg1.remote_ip4,
3686 self.pg0.add_stream(pkts)
3687 self.pg_enable_capture(self.pg_interfaces)
3689 frags = self.pg1.get_capture(len(pkts))
3690 p = self.reass_frags_and_verify(frags,
3692 self.pg1.remote_ip4)
3693 self.assertEqual(p[TCP].dport, 20)
3694 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3695 self.tcp_port_out = p[TCP].sport
3696 self.assertEqual(data, p[Raw].load)
3699 pkts = self.create_stream_frag(self.pg1,
3705 self.pg1.add_stream(pkts)
3706 self.pg_enable_capture(self.pg_interfaces)
3708 frags = self.pg0.get_capture(len(pkts))
3709 p = self.reass_frags_and_verify(frags,
3710 self.pg1.remote_ip4,
3711 self.pg0.remote_ip4)
3712 self.assertEqual(p[TCP].sport, 20)
3713 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3714 self.assertEqual(data, p[Raw].load)
3716 def test_port_restricted(self):
3717 """ Port restricted NAT44 (MAP-E CE) """
3718 self.nat44_add_address(self.nat_addr)
3719 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3720 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3722 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3723 "psid-offset 6 psid-len 6")
3725 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3726 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3727 TCP(sport=4567, dport=22))
3728 self.pg0.add_stream(p)
3729 self.pg_enable_capture(self.pg_interfaces)
3731 capture = self.pg1.get_capture(1)
3736 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3737 self.assertEqual(ip.src, self.nat_addr)
3738 self.assertEqual(tcp.dport, 22)
3739 self.assertNotEqual(tcp.sport, 4567)
3740 self.assertEqual((tcp.sport >> 6) & 63, 10)
3741 self.check_tcp_checksum(p)
3742 self.check_ip_checksum(p)
3744 self.logger.error(ppp("Unexpected or invalid packet:", p))
3747 def test_twice_nat(self):
3749 twice_nat_addr = '10.0.1.3'
3754 self.nat44_add_address(self.nat_addr)
3755 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3756 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3757 port_in, port_out, proto=IP_PROTOS.tcp,
3759 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3760 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3763 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3764 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3765 TCP(sport=eh_port_out, dport=port_out))
3766 self.pg1.add_stream(p)
3767 self.pg_enable_capture(self.pg_interfaces)
3769 capture = self.pg0.get_capture(1)
3774 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3775 self.assertEqual(ip.src, twice_nat_addr)
3776 self.assertEqual(tcp.dport, port_in)
3777 self.assertNotEqual(tcp.sport, eh_port_out)
3778 eh_port_in = tcp.sport
3779 self.check_tcp_checksum(p)
3780 self.check_ip_checksum(p)
3782 self.logger.error(ppp("Unexpected or invalid packet:", p))
3785 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3786 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3787 TCP(sport=port_in, dport=eh_port_in))
3788 self.pg0.add_stream(p)
3789 self.pg_enable_capture(self.pg_interfaces)
3791 capture = self.pg1.get_capture(1)
3796 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3797 self.assertEqual(ip.src, self.nat_addr)
3798 self.assertEqual(tcp.dport, eh_port_out)
3799 self.assertEqual(tcp.sport, port_out)
3800 self.check_tcp_checksum(p)
3801 self.check_ip_checksum(p)
3803 self.logger.error(ppp("Unexpected or invalid packet:", p))
3806 def test_twice_nat_lb(self):
3807 """ Twice NAT44 local service load balancing """
3808 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3809 twice_nat_addr = '10.0.1.3'
3814 server1 = self.pg0.remote_hosts[0]
3815 server2 = self.pg0.remote_hosts[1]
3817 locals = [{'addr': server1.ip4n,
3820 {'addr': server2.ip4n,
3824 self.nat44_add_address(self.nat_addr)
3825 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3827 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3831 local_num=len(locals),
3833 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3834 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3837 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3838 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3839 TCP(sport=eh_port_out, dport=external_port))
3840 self.pg1.add_stream(p)
3841 self.pg_enable_capture(self.pg_interfaces)
3843 capture = self.pg0.get_capture(1)
3849 self.assertEqual(ip.src, twice_nat_addr)
3850 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3851 if ip.dst == server1.ip4:
3855 self.assertNotEqual(tcp.sport, eh_port_out)
3856 eh_port_in = tcp.sport
3857 self.assertEqual(tcp.dport, local_port)
3858 self.check_tcp_checksum(p)
3859 self.check_ip_checksum(p)
3861 self.logger.error(ppp("Unexpected or invalid packet:", p))
3864 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3865 IP(src=server.ip4, dst=twice_nat_addr) /
3866 TCP(sport=local_port, dport=eh_port_in))
3867 self.pg0.add_stream(p)
3868 self.pg_enable_capture(self.pg_interfaces)
3870 capture = self.pg1.get_capture(1)
3875 self.assertEqual(ip.src, self.nat_addr)
3876 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3877 self.assertEqual(tcp.sport, external_port)
3878 self.assertEqual(tcp.dport, eh_port_out)
3879 self.check_tcp_checksum(p)
3880 self.check_ip_checksum(p)
3882 self.logger.error(ppp("Unexpected or invalid packet:", p))
3885 def test_twice_nat_interface_addr(self):
3886 """ Acquire twice NAT44 addresses from interface """
3887 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3889 # no address in NAT pool
3890 adresses = self.vapi.nat44_address_dump()
3891 self.assertEqual(0, len(adresses))
3893 # configure interface address and check NAT address pool
3894 self.pg7.config_ip4()
3895 adresses = self.vapi.nat44_address_dump()
3896 self.assertEqual(1, len(adresses))
3897 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3898 self.assertEqual(adresses[0].twice_nat, 1)
3900 # remove interface address and check NAT address pool
3901 self.pg7.unconfig_ip4()
3902 adresses = self.vapi.nat44_address_dump()
3903 self.assertEqual(0, len(adresses))
3905 def test_ipfix_max_frags(self):
3906 """ IPFIX logging maximum fragments pending reassembly exceeded """
3907 self.nat44_add_address(self.nat_addr)
3908 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3909 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3911 self.vapi.nat_set_reass(max_frag=0)
3912 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3913 src_address=self.pg3.local_ip4n,
3915 template_interval=10)
3916 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3917 src_port=self.ipfix_src_port)
3919 data = "A" * 4 + "B" * 16 + "C" * 3
3920 self.tcp_port_in = random.randint(1025, 65535)
3921 pkts = self.create_stream_frag(self.pg0,
3922 self.pg1.remote_ip4,
3926 self.pg0.add_stream(pkts[-1])
3927 self.pg_enable_capture(self.pg_interfaces)
3929 frags = self.pg1.get_capture(0)
3930 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3931 capture = self.pg3.get_capture(9)
3932 ipfix = IPFIXDecoder()
3933 # first load template
3935 self.assertTrue(p.haslayer(IPFIX))
3936 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3937 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3938 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3939 self.assertEqual(p[UDP].dport, 4739)
3940 self.assertEqual(p[IPFIX].observationDomainID,
3941 self.ipfix_domain_id)
3942 if p.haslayer(Template):
3943 ipfix.add_template(p.getlayer(Template))
3944 # verify events in data set
3946 if p.haslayer(Data):
3947 data = ipfix.decode_data_set(p.getlayer(Set))
3948 self.verify_ipfix_max_fragments_ip4(data, 0,
3949 self.pg0.remote_ip4n)
3952 super(TestNAT44, self).tearDown()
3953 if not self.vpp_dead:
3954 self.logger.info(self.vapi.cli("show nat44 addresses"))
3955 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3956 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3957 self.logger.info(self.vapi.cli("show nat44 interface address"))
3958 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3959 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3960 self.vapi.cli("nat addr-port-assignment-alg default")
3964 class TestNAT44Out2InDPO(MethodHolder):
3965 """ NAT44 Test Cases using out2in DPO """
3968 def setUpConstants(cls):
3969 super(TestNAT44Out2InDPO, cls).setUpConstants()
3970 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3973 def setUpClass(cls):
3974 super(TestNAT44Out2InDPO, cls).setUpClass()
3977 cls.tcp_port_in = 6303
3978 cls.tcp_port_out = 6303
3979 cls.udp_port_in = 6304
3980 cls.udp_port_out = 6304
3981 cls.icmp_id_in = 6305
3982 cls.icmp_id_out = 6305
3983 cls.nat_addr = '10.0.0.3'
3984 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3985 cls.dst_ip4 = '192.168.70.1'
3987 cls.create_pg_interfaces(range(2))
3990 cls.pg0.config_ip4()
3991 cls.pg0.resolve_arp()
3994 cls.pg1.config_ip6()
3995 cls.pg1.resolve_ndp()
3997 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3998 dst_address_length=0,
3999 next_hop_address=cls.pg1.remote_ip6n,
4000 next_hop_sw_if_index=cls.pg1.sw_if_index)
4003 super(TestNAT44Out2InDPO, cls).tearDownClass()
4006 def configure_xlat(self):
4007 self.dst_ip6_pfx = '1:2:3::'
4008 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4010 self.dst_ip6_pfx_len = 96
4011 self.src_ip6_pfx = '4:5:6::'
4012 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4014 self.src_ip6_pfx_len = 96
4015 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4016 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4017 '\x00\x00\x00\x00', 0, is_translation=1,
4020 def test_464xlat_ce(self):
4021 """ Test 464XLAT CE with NAT44 """
4023 self.configure_xlat()
4025 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4026 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4028 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4029 self.dst_ip6_pfx_len)
4030 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4031 self.src_ip6_pfx_len)
4034 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4035 self.pg0.add_stream(pkts)
4036 self.pg_enable_capture(self.pg_interfaces)
4038 capture = self.pg1.get_capture(len(pkts))
4039 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4042 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4044 self.pg1.add_stream(pkts)
4045 self.pg_enable_capture(self.pg_interfaces)
4047 capture = self.pg0.get_capture(len(pkts))
4048 self.verify_capture_in(capture, self.pg0)
4050 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4052 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4053 self.nat_addr_n, is_add=0)
4055 def test_464xlat_ce_no_nat(self):
4056 """ Test 464XLAT CE without NAT44 """
4058 self.configure_xlat()
4060 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4061 self.dst_ip6_pfx_len)
4062 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4063 self.src_ip6_pfx_len)
4065 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4066 self.pg0.add_stream(pkts)
4067 self.pg_enable_capture(self.pg_interfaces)
4069 capture = self.pg1.get_capture(len(pkts))
4070 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4071 nat_ip=out_dst_ip6, same_port=True)
4073 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4074 self.pg1.add_stream(pkts)
4075 self.pg_enable_capture(self.pg_interfaces)
4077 capture = self.pg0.get_capture(len(pkts))
4078 self.verify_capture_in(capture, self.pg0)
4081 class TestDeterministicNAT(MethodHolder):
4082 """ Deterministic NAT Test Cases """
4085 def setUpConstants(cls):
4086 super(TestDeterministicNAT, cls).setUpConstants()
4087 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4090 def setUpClass(cls):
4091 super(TestDeterministicNAT, cls).setUpClass()
4094 cls.tcp_port_in = 6303
4095 cls.tcp_external_port = 6303
4096 cls.udp_port_in = 6304
4097 cls.udp_external_port = 6304
4098 cls.icmp_id_in = 6305
4099 cls.nat_addr = '10.0.0.3'
4101 cls.create_pg_interfaces(range(3))
4102 cls.interfaces = list(cls.pg_interfaces)
4104 for i in cls.interfaces:
4109 cls.pg0.generate_remote_hosts(2)
4110 cls.pg0.configure_ipv4_neighbors()
4113 super(TestDeterministicNAT, cls).tearDownClass()
4116 def create_stream_in(self, in_if, out_if, ttl=64):
4118 Create packet stream for inside network
4120 :param in_if: Inside interface
4121 :param out_if: Outside interface
4122 :param ttl: TTL of generated packets
4126 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4127 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4128 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4132 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4133 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4134 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4138 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4139 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4140 ICMP(id=self.icmp_id_in, type='echo-request'))
4145 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4147 Create packet stream for outside network
4149 :param out_if: Outside interface
4150 :param dst_ip: Destination IP address (Default use global NAT address)
4151 :param ttl: TTL of generated packets
4154 dst_ip = self.nat_addr
4157 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4158 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4159 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4163 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4164 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4165 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4169 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4170 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4171 ICMP(id=self.icmp_external_id, type='echo-reply'))
4176 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4178 Verify captured packets on outside network
4180 :param capture: Captured packets
4181 :param nat_ip: Translated IP address (Default use global NAT address)
4182 :param same_port: Sorce port number is not translated (Default False)
4183 :param packet_num: Expected number of packets (Default 3)
4186 nat_ip = self.nat_addr
4187 self.assertEqual(packet_num, len(capture))
4188 for packet in capture:
4190 self.assertEqual(packet[IP].src, nat_ip)
4191 if packet.haslayer(TCP):
4192 self.tcp_port_out = packet[TCP].sport
4193 elif packet.haslayer(UDP):
4194 self.udp_port_out = packet[UDP].sport
4196 self.icmp_external_id = packet[ICMP].id
4198 self.logger.error(ppp("Unexpected or invalid packet "
4199 "(outside network):", packet))
4202 def initiate_tcp_session(self, in_if, out_if):
4204 Initiates TCP session
4206 :param in_if: Inside interface
4207 :param out_if: Outside interface
4210 # SYN packet in->out
4211 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4212 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4213 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4216 self.pg_enable_capture(self.pg_interfaces)
4218 capture = out_if.get_capture(1)
4220 self.tcp_port_out = p[TCP].sport
4222 # SYN + ACK packet out->in
4223 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4224 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4225 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4227 out_if.add_stream(p)
4228 self.pg_enable_capture(self.pg_interfaces)
4230 in_if.get_capture(1)
4232 # ACK packet in->out
4233 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4234 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4235 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4238 self.pg_enable_capture(self.pg_interfaces)
4240 out_if.get_capture(1)
4243 self.logger.error("TCP 3 way handshake failed")
4246 def verify_ipfix_max_entries_per_user(self, data):
4248 Verify IPFIX maximum entries per user exceeded event
4250 :param data: Decoded IPFIX data records
4252 self.assertEqual(1, len(data))
4255 self.assertEqual(ord(record[230]), 13)
4256 # natQuotaExceededEvent
4257 self.assertEqual('\x03\x00\x00\x00', record[466])
4259 self.assertEqual('\xe8\x03\x00\x00', record[473])
4261 self.assertEqual(self.pg0.remote_ip4n, record[8])
4263 def test_deterministic_mode(self):
4264 """ NAT plugin run deterministic mode """
4265 in_addr = '172.16.255.0'
4266 out_addr = '172.17.255.50'
4267 in_addr_t = '172.16.255.20'
4268 in_addr_n = socket.inet_aton(in_addr)
4269 out_addr_n = socket.inet_aton(out_addr)
4270 in_addr_t_n = socket.inet_aton(in_addr_t)
4274 nat_config = self.vapi.nat_show_config()
4275 self.assertEqual(1, nat_config.deterministic)
4277 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4279 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4280 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4281 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4282 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4284 deterministic_mappings = self.vapi.nat_det_map_dump()
4285 self.assertEqual(len(deterministic_mappings), 1)
4286 dsm = deterministic_mappings[0]
4287 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4288 self.assertEqual(in_plen, dsm.in_plen)
4289 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4290 self.assertEqual(out_plen, dsm.out_plen)
4292 self.clear_nat_det()
4293 deterministic_mappings = self.vapi.nat_det_map_dump()
4294 self.assertEqual(len(deterministic_mappings), 0)
4296 def test_set_timeouts(self):
4297 """ Set deterministic NAT timeouts """
4298 timeouts_before = self.vapi.nat_det_get_timeouts()
4300 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4301 timeouts_before.tcp_established + 10,
4302 timeouts_before.tcp_transitory + 10,
4303 timeouts_before.icmp + 10)
4305 timeouts_after = self.vapi.nat_det_get_timeouts()
4307 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4308 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4309 self.assertNotEqual(timeouts_before.tcp_established,
4310 timeouts_after.tcp_established)
4311 self.assertNotEqual(timeouts_before.tcp_transitory,
4312 timeouts_after.tcp_transitory)
4314 def test_det_in(self):
4315 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4317 nat_ip = "10.0.0.10"
4319 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4321 socket.inet_aton(nat_ip),
4323 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4324 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4328 pkts = self.create_stream_in(self.pg0, self.pg1)
4329 self.pg0.add_stream(pkts)
4330 self.pg_enable_capture(self.pg_interfaces)
4332 capture = self.pg1.get_capture(len(pkts))
4333 self.verify_capture_out(capture, nat_ip)
4336 pkts = self.create_stream_out(self.pg1, nat_ip)
4337 self.pg1.add_stream(pkts)
4338 self.pg_enable_capture(self.pg_interfaces)
4340 capture = self.pg0.get_capture(len(pkts))
4341 self.verify_capture_in(capture, self.pg0)
4344 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4345 self.assertEqual(len(sessions), 3)
4349 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4350 self.assertEqual(s.in_port, self.tcp_port_in)
4351 self.assertEqual(s.out_port, self.tcp_port_out)
4352 self.assertEqual(s.ext_port, self.tcp_external_port)
4356 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4357 self.assertEqual(s.in_port, self.udp_port_in)
4358 self.assertEqual(s.out_port, self.udp_port_out)
4359 self.assertEqual(s.ext_port, self.udp_external_port)
4363 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4364 self.assertEqual(s.in_port, self.icmp_id_in)
4365 self.assertEqual(s.out_port, self.icmp_external_id)
4367 def test_multiple_users(self):
4368 """ Deterministic NAT multiple users """
4370 nat_ip = "10.0.0.10"
4372 external_port = 6303
4374 host0 = self.pg0.remote_hosts[0]
4375 host1 = self.pg0.remote_hosts[1]
4377 self.vapi.nat_det_add_del_map(host0.ip4n,
4379 socket.inet_aton(nat_ip),
4381 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4382 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4386 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4387 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4388 TCP(sport=port_in, dport=external_port))
4389 self.pg0.add_stream(p)
4390 self.pg_enable_capture(self.pg_interfaces)
4392 capture = self.pg1.get_capture(1)
4397 self.assertEqual(ip.src, nat_ip)
4398 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4399 self.assertEqual(tcp.dport, external_port)
4400 port_out0 = tcp.sport
4402 self.logger.error(ppp("Unexpected or invalid packet:", p))
4406 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4407 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4408 TCP(sport=port_in, dport=external_port))
4409 self.pg0.add_stream(p)
4410 self.pg_enable_capture(self.pg_interfaces)
4412 capture = self.pg1.get_capture(1)
4417 self.assertEqual(ip.src, nat_ip)
4418 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4419 self.assertEqual(tcp.dport, external_port)
4420 port_out1 = tcp.sport
4422 self.logger.error(ppp("Unexpected or invalid packet:", p))
4425 dms = self.vapi.nat_det_map_dump()
4426 self.assertEqual(1, len(dms))
4427 self.assertEqual(2, dms[0].ses_num)
4430 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4431 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4432 TCP(sport=external_port, dport=port_out0))
4433 self.pg1.add_stream(p)
4434 self.pg_enable_capture(self.pg_interfaces)
4436 capture = self.pg0.get_capture(1)
4441 self.assertEqual(ip.src, self.pg1.remote_ip4)
4442 self.assertEqual(ip.dst, host0.ip4)
4443 self.assertEqual(tcp.dport, port_in)
4444 self.assertEqual(tcp.sport, external_port)
4446 self.logger.error(ppp("Unexpected or invalid packet:", p))
4450 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4451 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4452 TCP(sport=external_port, dport=port_out1))
4453 self.pg1.add_stream(p)
4454 self.pg_enable_capture(self.pg_interfaces)
4456 capture = self.pg0.get_capture(1)
4461 self.assertEqual(ip.src, self.pg1.remote_ip4)
4462 self.assertEqual(ip.dst, host1.ip4)
4463 self.assertEqual(tcp.dport, port_in)
4464 self.assertEqual(tcp.sport, external_port)
4466 self.logger.error(ppp("Unexpected or invalid packet", p))
4469 # session close api test
4470 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4472 self.pg1.remote_ip4n,
4474 dms = self.vapi.nat_det_map_dump()
4475 self.assertEqual(dms[0].ses_num, 1)
4477 self.vapi.nat_det_close_session_in(host0.ip4n,
4479 self.pg1.remote_ip4n,
4481 dms = self.vapi.nat_det_map_dump()
4482 self.assertEqual(dms[0].ses_num, 0)
4484 def test_tcp_session_close_detection_in(self):
4485 """ Deterministic NAT TCP session close from inside network """
4486 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4488 socket.inet_aton(self.nat_addr),
4490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4494 self.initiate_tcp_session(self.pg0, self.pg1)
4496 # close the session from inside
4498 # FIN packet in -> out
4499 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4500 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4501 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4503 self.pg0.add_stream(p)
4504 self.pg_enable_capture(self.pg_interfaces)
4506 self.pg1.get_capture(1)
4510 # ACK packet out -> in
4511 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4512 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4513 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4517 # FIN packet out -> in
4518 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4519 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4520 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4524 self.pg1.add_stream(pkts)
4525 self.pg_enable_capture(self.pg_interfaces)
4527 self.pg0.get_capture(2)
4529 # ACK packet in -> out
4530 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4531 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4532 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4534 self.pg0.add_stream(p)
4535 self.pg_enable_capture(self.pg_interfaces)
4537 self.pg1.get_capture(1)
4539 # Check if deterministic NAT44 closed the session
4540 dms = self.vapi.nat_det_map_dump()
4541 self.assertEqual(0, dms[0].ses_num)
4543 self.logger.error("TCP session termination failed")
4546 def test_tcp_session_close_detection_out(self):
4547 """ Deterministic NAT TCP session close from outside network """
4548 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4550 socket.inet_aton(self.nat_addr),
4552 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4553 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4556 self.initiate_tcp_session(self.pg0, self.pg1)
4558 # close the session from outside
4560 # FIN packet out -> in
4561 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4562 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4563 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4565 self.pg1.add_stream(p)
4566 self.pg_enable_capture(self.pg_interfaces)
4568 self.pg0.get_capture(1)
4572 # ACK packet in -> out
4573 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4574 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4575 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4579 # ACK packet in -> out
4580 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4581 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4582 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4586 self.pg0.add_stream(pkts)
4587 self.pg_enable_capture(self.pg_interfaces)
4589 self.pg1.get_capture(2)
4591 # ACK packet out -> in
4592 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4593 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4594 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4596 self.pg1.add_stream(p)
4597 self.pg_enable_capture(self.pg_interfaces)
4599 self.pg0.get_capture(1)
4601 # Check if deterministic NAT44 closed the session
4602 dms = self.vapi.nat_det_map_dump()
4603 self.assertEqual(0, dms[0].ses_num)
4605 self.logger.error("TCP session termination failed")
4608 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4609 def test_session_timeout(self):
4610 """ Deterministic NAT session timeouts """
4611 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4613 socket.inet_aton(self.nat_addr),
4615 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4616 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4619 self.initiate_tcp_session(self.pg0, self.pg1)
4620 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4621 pkts = self.create_stream_in(self.pg0, self.pg1)
4622 self.pg0.add_stream(pkts)
4623 self.pg_enable_capture(self.pg_interfaces)
4625 capture = self.pg1.get_capture(len(pkts))
4628 dms = self.vapi.nat_det_map_dump()
4629 self.assertEqual(0, dms[0].ses_num)
4631 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4632 def test_session_limit_per_user(self):
4633 """ Deterministic NAT maximum sessions per user limit """
4634 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4636 socket.inet_aton(self.nat_addr),
4638 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4639 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4641 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4642 src_address=self.pg2.local_ip4n,
4644 template_interval=10)
4645 self.vapi.nat_ipfix()
4648 for port in range(1025, 2025):
4649 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4650 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4651 UDP(sport=port, dport=port))
4654 self.pg0.add_stream(pkts)
4655 self.pg_enable_capture(self.pg_interfaces)
4657 capture = self.pg1.get_capture(len(pkts))
4659 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4660 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4661 UDP(sport=3001, dport=3002))
4662 self.pg0.add_stream(p)
4663 self.pg_enable_capture(self.pg_interfaces)
4665 capture = self.pg1.assert_nothing_captured()
4667 # verify ICMP error packet
4668 capture = self.pg0.get_capture(1)
4670 self.assertTrue(p.haslayer(ICMP))
4672 self.assertEqual(icmp.type, 3)
4673 self.assertEqual(icmp.code, 1)
4674 self.assertTrue(icmp.haslayer(IPerror))
4675 inner_ip = icmp[IPerror]
4676 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4677 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4679 dms = self.vapi.nat_det_map_dump()
4681 self.assertEqual(1000, dms[0].ses_num)
4683 # verify IPFIX logging
4684 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4686 capture = self.pg2.get_capture(2)
4687 ipfix = IPFIXDecoder()
4688 # first load template
4690 self.assertTrue(p.haslayer(IPFIX))
4691 if p.haslayer(Template):
4692 ipfix.add_template(p.getlayer(Template))
4693 # verify events in data set
4695 if p.haslayer(Data):
4696 data = ipfix.decode_data_set(p.getlayer(Set))
4697 self.verify_ipfix_max_entries_per_user(data)
4699 def clear_nat_det(self):
4701 Clear deterministic NAT configuration.
4703 self.vapi.nat_ipfix(enable=0)
4704 self.vapi.nat_det_set_timeouts()
4705 deterministic_mappings = self.vapi.nat_det_map_dump()
4706 for dsm in deterministic_mappings:
4707 self.vapi.nat_det_add_del_map(dsm.in_addr,
4713 interfaces = self.vapi.nat44_interface_dump()
4714 for intf in interfaces:
4715 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4720 super(TestDeterministicNAT, self).tearDown()
4721 if not self.vpp_dead:
4722 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4724 self.vapi.cli("show nat44 deterministic mappings"))
4726 self.vapi.cli("show nat44 deterministic timeouts"))
4728 self.vapi.cli("show nat44 deterministic sessions"))
4729 self.clear_nat_det()
4732 class TestNAT64(MethodHolder):
4733 """ NAT64 Test Cases """
4736 def setUpConstants(cls):
4737 super(TestNAT64, cls).setUpConstants()
4738 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4739 "nat64 st hash buckets 256", "}"])
4742 def setUpClass(cls):
4743 super(TestNAT64, cls).setUpClass()
4746 cls.tcp_port_in = 6303
4747 cls.tcp_port_out = 6303
4748 cls.udp_port_in = 6304
4749 cls.udp_port_out = 6304
4750 cls.icmp_id_in = 6305
4751 cls.icmp_id_out = 6305
4752 cls.nat_addr = '10.0.0.3'
4753 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4755 cls.vrf1_nat_addr = '10.0.10.3'
4756 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4758 cls.ipfix_src_port = 4739
4759 cls.ipfix_domain_id = 1
4761 cls.create_pg_interfaces(range(5))
4762 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4763 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4764 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4766 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4768 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4770 cls.pg0.generate_remote_hosts(2)
4772 for i in cls.ip6_interfaces:
4775 i.configure_ipv6_neighbors()
4777 for i in cls.ip4_interfaces:
4783 cls.pg3.config_ip4()
4784 cls.pg3.resolve_arp()
4785 cls.pg3.config_ip6()
4786 cls.pg3.configure_ipv6_neighbors()
4789 super(TestNAT64, cls).tearDownClass()
4792 def test_pool(self):
4793 """ Add/delete address to NAT64 pool """
4794 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4796 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4798 addresses = self.vapi.nat64_pool_addr_dump()
4799 self.assertEqual(len(addresses), 1)
4800 self.assertEqual(addresses[0].address, nat_addr)
4802 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4804 addresses = self.vapi.nat64_pool_addr_dump()
4805 self.assertEqual(len(addresses), 0)
4807 def test_interface(self):
4808 """ Enable/disable NAT64 feature on the interface """
4809 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4810 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4812 interfaces = self.vapi.nat64_interface_dump()
4813 self.assertEqual(len(interfaces), 2)
4816 for intf in interfaces:
4817 if intf.sw_if_index == self.pg0.sw_if_index:
4818 self.assertEqual(intf.is_inside, 1)
4820 elif intf.sw_if_index == self.pg1.sw_if_index:
4821 self.assertEqual(intf.is_inside, 0)
4823 self.assertTrue(pg0_found)
4824 self.assertTrue(pg1_found)
4826 features = self.vapi.cli("show interface features pg0")
4827 self.assertNotEqual(features.find('nat64-in2out'), -1)
4828 features = self.vapi.cli("show interface features pg1")
4829 self.assertNotEqual(features.find('nat64-out2in'), -1)
4831 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4832 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4834 interfaces = self.vapi.nat64_interface_dump()
4835 self.assertEqual(len(interfaces), 0)
4837 def test_static_bib(self):
4838 """ Add/delete static BIB entry """
4839 in_addr = socket.inet_pton(socket.AF_INET6,
4840 '2001:db8:85a3::8a2e:370:7334')
4841 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4844 proto = IP_PROTOS.tcp
4846 self.vapi.nat64_add_del_static_bib(in_addr,
4851 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4856 self.assertEqual(bibe.i_addr, in_addr)
4857 self.assertEqual(bibe.o_addr, out_addr)
4858 self.assertEqual(bibe.i_port, in_port)
4859 self.assertEqual(bibe.o_port, out_port)
4860 self.assertEqual(static_bib_num, 1)
4862 self.vapi.nat64_add_del_static_bib(in_addr,
4868 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4873 self.assertEqual(static_bib_num, 0)
4875 def test_set_timeouts(self):
4876 """ Set NAT64 timeouts """
4877 # verify default values
4878 timeouts = self.vapi.nat64_get_timeouts()
4879 self.assertEqual(timeouts.udp, 300)
4880 self.assertEqual(timeouts.icmp, 60)
4881 self.assertEqual(timeouts.tcp_trans, 240)
4882 self.assertEqual(timeouts.tcp_est, 7440)
4883 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4885 # set and verify custom values
4886 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4887 tcp_est=7450, tcp_incoming_syn=10)
4888 timeouts = self.vapi.nat64_get_timeouts()
4889 self.assertEqual(timeouts.udp, 200)
4890 self.assertEqual(timeouts.icmp, 30)
4891 self.assertEqual(timeouts.tcp_trans, 250)
4892 self.assertEqual(timeouts.tcp_est, 7450)
4893 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4895 def test_dynamic(self):
4896 """ NAT64 dynamic translation test """
4897 self.tcp_port_in = 6303
4898 self.udp_port_in = 6304
4899 self.icmp_id_in = 6305
4901 ses_num_start = self.nat64_get_ses_num()
4903 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4905 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4906 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4909 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4910 self.pg0.add_stream(pkts)
4911 self.pg_enable_capture(self.pg_interfaces)
4913 capture = self.pg1.get_capture(len(pkts))
4914 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4915 dst_ip=self.pg1.remote_ip4)
4918 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4919 self.pg1.add_stream(pkts)
4920 self.pg_enable_capture(self.pg_interfaces)
4922 capture = self.pg0.get_capture(len(pkts))
4923 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4924 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4927 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4928 self.pg0.add_stream(pkts)
4929 self.pg_enable_capture(self.pg_interfaces)
4931 capture = self.pg1.get_capture(len(pkts))
4932 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4933 dst_ip=self.pg1.remote_ip4)
4936 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4937 self.pg1.add_stream(pkts)
4938 self.pg_enable_capture(self.pg_interfaces)
4940 capture = self.pg0.get_capture(len(pkts))
4941 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4943 ses_num_end = self.nat64_get_ses_num()
4945 self.assertEqual(ses_num_end - ses_num_start, 3)
4947 # tenant with specific VRF
4948 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4949 self.vrf1_nat_addr_n,
4950 vrf_id=self.vrf1_id)
4951 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4953 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4954 self.pg2.add_stream(pkts)
4955 self.pg_enable_capture(self.pg_interfaces)
4957 capture = self.pg1.get_capture(len(pkts))
4958 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4959 dst_ip=self.pg1.remote_ip4)
4961 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4962 self.pg1.add_stream(pkts)
4963 self.pg_enable_capture(self.pg_interfaces)
4965 capture = self.pg2.get_capture(len(pkts))
4966 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4968 def test_static(self):
4969 """ NAT64 static translation test """
4970 self.tcp_port_in = 60303
4971 self.udp_port_in = 60304
4972 self.icmp_id_in = 60305
4973 self.tcp_port_out = 60303
4974 self.udp_port_out = 60304
4975 self.icmp_id_out = 60305
4977 ses_num_start = self.nat64_get_ses_num()
4979 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4981 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4982 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4984 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4989 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4994 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5001 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5002 self.pg0.add_stream(pkts)
5003 self.pg_enable_capture(self.pg_interfaces)
5005 capture = self.pg1.get_capture(len(pkts))
5006 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5007 dst_ip=self.pg1.remote_ip4, same_port=True)
5010 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5011 self.pg1.add_stream(pkts)
5012 self.pg_enable_capture(self.pg_interfaces)
5014 capture = self.pg0.get_capture(len(pkts))
5015 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5016 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5018 ses_num_end = self.nat64_get_ses_num()
5020 self.assertEqual(ses_num_end - ses_num_start, 3)
5022 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5023 def test_session_timeout(self):
5024 """ NAT64 session timeout """
5025 self.icmp_id_in = 1234
5026 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5028 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5029 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5030 self.vapi.nat64_set_timeouts(icmp=5)
5032 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5033 self.pg0.add_stream(pkts)
5034 self.pg_enable_capture(self.pg_interfaces)
5036 capture = self.pg1.get_capture(len(pkts))
5038 ses_num_before_timeout = self.nat64_get_ses_num()
5042 # ICMP session after timeout
5043 ses_num_after_timeout = self.nat64_get_ses_num()
5044 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5046 def test_icmp_error(self):
5047 """ NAT64 ICMP Error message translation """
5048 self.tcp_port_in = 6303
5049 self.udp_port_in = 6304
5050 self.icmp_id_in = 6305
5052 ses_num_start = self.nat64_get_ses_num()
5054 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5056 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5057 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5059 # send some packets to create sessions
5060 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5061 self.pg0.add_stream(pkts)
5062 self.pg_enable_capture(self.pg_interfaces)
5064 capture_ip4 = self.pg1.get_capture(len(pkts))
5065 self.verify_capture_out(capture_ip4,
5066 nat_ip=self.nat_addr,
5067 dst_ip=self.pg1.remote_ip4)
5069 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5070 self.pg1.add_stream(pkts)
5071 self.pg_enable_capture(self.pg_interfaces)
5073 capture_ip6 = self.pg0.get_capture(len(pkts))
5074 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5075 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5076 self.pg0.remote_ip6)
5079 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5080 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5081 ICMPv6DestUnreach(code=1) /
5082 packet[IPv6] for packet in capture_ip6]
5083 self.pg0.add_stream(pkts)
5084 self.pg_enable_capture(self.pg_interfaces)
5086 capture = self.pg1.get_capture(len(pkts))
5087 for packet in capture:
5089 self.assertEqual(packet[IP].src, self.nat_addr)
5090 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5091 self.assertEqual(packet[ICMP].type, 3)
5092 self.assertEqual(packet[ICMP].code, 13)
5093 inner = packet[IPerror]
5094 self.assertEqual(inner.src, self.pg1.remote_ip4)
5095 self.assertEqual(inner.dst, self.nat_addr)
5096 self.check_icmp_checksum(packet)
5097 if inner.haslayer(TCPerror):
5098 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5099 elif inner.haslayer(UDPerror):
5100 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5102 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5104 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5108 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5109 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5110 ICMP(type=3, code=13) /
5111 packet[IP] for packet in capture_ip4]
5112 self.pg1.add_stream(pkts)
5113 self.pg_enable_capture(self.pg_interfaces)
5115 capture = self.pg0.get_capture(len(pkts))
5116 for packet in capture:
5118 self.assertEqual(packet[IPv6].src, ip.src)
5119 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5120 icmp = packet[ICMPv6DestUnreach]
5121 self.assertEqual(icmp.code, 1)
5122 inner = icmp[IPerror6]
5123 self.assertEqual(inner.src, self.pg0.remote_ip6)
5124 self.assertEqual(inner.dst, ip.src)
5125 self.check_icmpv6_checksum(packet)
5126 if inner.haslayer(TCPerror):
5127 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5128 elif inner.haslayer(UDPerror):
5129 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5131 self.assertEqual(inner[ICMPv6EchoRequest].id,
5134 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5137 def test_hairpinning(self):
5138 """ NAT64 hairpinning """
5140 client = self.pg0.remote_hosts[0]
5141 server = self.pg0.remote_hosts[1]
5142 server_tcp_in_port = 22
5143 server_tcp_out_port = 4022
5144 server_udp_in_port = 23
5145 server_udp_out_port = 4023
5146 client_tcp_in_port = 1234
5147 client_udp_in_port = 1235
5148 client_tcp_out_port = 0
5149 client_udp_out_port = 0
5150 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5151 nat_addr_ip6 = ip.src
5153 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5155 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5156 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5158 self.vapi.nat64_add_del_static_bib(server.ip6n,
5161 server_tcp_out_port,
5163 self.vapi.nat64_add_del_static_bib(server.ip6n,
5166 server_udp_out_port,
5171 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5172 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5173 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5175 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5176 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5177 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5179 self.pg0.add_stream(pkts)
5180 self.pg_enable_capture(self.pg_interfaces)
5182 capture = self.pg0.get_capture(len(pkts))
5183 for packet in capture:
5185 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5186 self.assertEqual(packet[IPv6].dst, server.ip6)
5187 if packet.haslayer(TCP):
5188 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5189 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5190 self.check_tcp_checksum(packet)
5191 client_tcp_out_port = packet[TCP].sport
5193 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5194 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5195 self.check_udp_checksum(packet)
5196 client_udp_out_port = packet[UDP].sport
5198 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5203 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5204 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5205 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5207 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5208 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5209 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5211 self.pg0.add_stream(pkts)
5212 self.pg_enable_capture(self.pg_interfaces)
5214 capture = self.pg0.get_capture(len(pkts))
5215 for packet in capture:
5217 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5218 self.assertEqual(packet[IPv6].dst, client.ip6)
5219 if packet.haslayer(TCP):
5220 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5221 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5222 self.check_tcp_checksum(packet)
5224 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5225 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5226 self.check_udp_checksum(packet)
5228 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5233 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5234 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5235 ICMPv6DestUnreach(code=1) /
5236 packet[IPv6] for packet in capture]
5237 self.pg0.add_stream(pkts)
5238 self.pg_enable_capture(self.pg_interfaces)
5240 capture = self.pg0.get_capture(len(pkts))
5241 for packet in capture:
5243 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5244 self.assertEqual(packet[IPv6].dst, server.ip6)
5245 icmp = packet[ICMPv6DestUnreach]
5246 self.assertEqual(icmp.code, 1)
5247 inner = icmp[IPerror6]
5248 self.assertEqual(inner.src, server.ip6)
5249 self.assertEqual(inner.dst, nat_addr_ip6)
5250 self.check_icmpv6_checksum(packet)
5251 if inner.haslayer(TCPerror):
5252 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5253 self.assertEqual(inner[TCPerror].dport,
5254 client_tcp_out_port)
5256 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5257 self.assertEqual(inner[UDPerror].dport,
5258 client_udp_out_port)
5260 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5263 def test_prefix(self):
5264 """ NAT64 Network-Specific Prefix """
5266 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5268 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5269 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5270 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5271 self.vrf1_nat_addr_n,
5272 vrf_id=self.vrf1_id)
5273 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5276 global_pref64 = "2001:db8::"
5277 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5278 global_pref64_len = 32
5279 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5281 prefix = self.vapi.nat64_prefix_dump()
5282 self.assertEqual(len(prefix), 1)
5283 self.assertEqual(prefix[0].prefix, global_pref64_n)
5284 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5285 self.assertEqual(prefix[0].vrf_id, 0)
5287 # Add tenant specific prefix
5288 vrf1_pref64 = "2001:db8:122:300::"
5289 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5290 vrf1_pref64_len = 56
5291 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5293 vrf_id=self.vrf1_id)
5294 prefix = self.vapi.nat64_prefix_dump()
5295 self.assertEqual(len(prefix), 2)
5298 pkts = self.create_stream_in_ip6(self.pg0,
5301 plen=global_pref64_len)
5302 self.pg0.add_stream(pkts)
5303 self.pg_enable_capture(self.pg_interfaces)
5305 capture = self.pg1.get_capture(len(pkts))
5306 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5307 dst_ip=self.pg1.remote_ip4)
5309 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5310 self.pg1.add_stream(pkts)
5311 self.pg_enable_capture(self.pg_interfaces)
5313 capture = self.pg0.get_capture(len(pkts))
5314 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5317 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5319 # Tenant specific prefix
5320 pkts = self.create_stream_in_ip6(self.pg2,
5323 plen=vrf1_pref64_len)
5324 self.pg2.add_stream(pkts)
5325 self.pg_enable_capture(self.pg_interfaces)
5327 capture = self.pg1.get_capture(len(pkts))
5328 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5329 dst_ip=self.pg1.remote_ip4)
5331 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5332 self.pg1.add_stream(pkts)
5333 self.pg_enable_capture(self.pg_interfaces)
5335 capture = self.pg2.get_capture(len(pkts))
5336 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5339 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5341 def test_unknown_proto(self):
5342 """ NAT64 translate packet with unknown protocol """
5344 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5346 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5347 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5348 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5351 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5352 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5353 TCP(sport=self.tcp_port_in, dport=20))
5354 self.pg0.add_stream(p)
5355 self.pg_enable_capture(self.pg_interfaces)
5357 p = self.pg1.get_capture(1)
5359 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5360 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5362 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5363 TCP(sport=1234, dport=1234))
5364 self.pg0.add_stream(p)
5365 self.pg_enable_capture(self.pg_interfaces)
5367 p = self.pg1.get_capture(1)
5370 self.assertEqual(packet[IP].src, self.nat_addr)
5371 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5372 self.assertTrue(packet.haslayer(GRE))
5373 self.check_ip_checksum(packet)
5375 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5379 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5380 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5382 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5383 TCP(sport=1234, dport=1234))
5384 self.pg1.add_stream(p)
5385 self.pg_enable_capture(self.pg_interfaces)
5387 p = self.pg0.get_capture(1)
5390 self.assertEqual(packet[IPv6].src, remote_ip6)
5391 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5392 self.assertEqual(packet[IPv6].nh, 47)
5394 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5397 def test_hairpinning_unknown_proto(self):
5398 """ NAT64 translate packet with unknown protocol - hairpinning """
5400 client = self.pg0.remote_hosts[0]
5401 server = self.pg0.remote_hosts[1]
5402 server_tcp_in_port = 22
5403 server_tcp_out_port = 4022
5404 client_tcp_in_port = 1234
5405 client_tcp_out_port = 1235
5406 server_nat_ip = "10.0.0.100"
5407 client_nat_ip = "10.0.0.110"
5408 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5409 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5410 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5411 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5413 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5415 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5416 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5418 self.vapi.nat64_add_del_static_bib(server.ip6n,
5421 server_tcp_out_port,
5424 self.vapi.nat64_add_del_static_bib(server.ip6n,
5430 self.vapi.nat64_add_del_static_bib(client.ip6n,
5433 client_tcp_out_port,
5437 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5438 IPv6(src=client.ip6, dst=server_nat_ip6) /
5439 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5440 self.pg0.add_stream(p)
5441 self.pg_enable_capture(self.pg_interfaces)
5443 p = self.pg0.get_capture(1)
5445 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5446 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5448 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5449 TCP(sport=1234, dport=1234))
5450 self.pg0.add_stream(p)
5451 self.pg_enable_capture(self.pg_interfaces)
5453 p = self.pg0.get_capture(1)
5456 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5457 self.assertEqual(packet[IPv6].dst, server.ip6)
5458 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5460 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5464 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5465 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5467 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5468 TCP(sport=1234, dport=1234))
5469 self.pg0.add_stream(p)
5470 self.pg_enable_capture(self.pg_interfaces)
5472 p = self.pg0.get_capture(1)
5475 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5476 self.assertEqual(packet[IPv6].dst, client.ip6)
5477 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5479 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5482 def test_one_armed_nat64(self):
5483 """ One armed NAT64 """
5485 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5489 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5491 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5492 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5495 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5496 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5497 TCP(sport=12345, dport=80))
5498 self.pg3.add_stream(p)
5499 self.pg_enable_capture(self.pg_interfaces)
5501 capture = self.pg3.get_capture(1)
5506 self.assertEqual(ip.src, self.nat_addr)
5507 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5508 self.assertNotEqual(tcp.sport, 12345)
5509 external_port = tcp.sport
5510 self.assertEqual(tcp.dport, 80)
5511 self.check_tcp_checksum(p)
5512 self.check_ip_checksum(p)
5514 self.logger.error(ppp("Unexpected or invalid packet:", p))
5518 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5519 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5520 TCP(sport=80, dport=external_port))
5521 self.pg3.add_stream(p)
5522 self.pg_enable_capture(self.pg_interfaces)
5524 capture = self.pg3.get_capture(1)
5529 self.assertEqual(ip.src, remote_host_ip6)
5530 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5531 self.assertEqual(tcp.sport, 80)
5532 self.assertEqual(tcp.dport, 12345)
5533 self.check_tcp_checksum(p)
5535 self.logger.error(ppp("Unexpected or invalid packet:", p))
5538 def test_frag_in_order(self):
5539 """ NAT64 translate fragments arriving in order """
5540 self.tcp_port_in = random.randint(1025, 65535)
5542 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5544 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5545 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5547 reass = self.vapi.nat_reass_dump()
5548 reass_n_start = len(reass)
5552 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5553 self.tcp_port_in, 20, data)
5554 self.pg0.add_stream(pkts)
5555 self.pg_enable_capture(self.pg_interfaces)
5557 frags = self.pg1.get_capture(len(pkts))
5558 p = self.reass_frags_and_verify(frags,
5560 self.pg1.remote_ip4)
5561 self.assertEqual(p[TCP].dport, 20)
5562 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5563 self.tcp_port_out = p[TCP].sport
5564 self.assertEqual(data, p[Raw].load)
5567 data = "A" * 4 + "b" * 16 + "C" * 3
5568 pkts = self.create_stream_frag(self.pg1,
5573 self.pg1.add_stream(pkts)
5574 self.pg_enable_capture(self.pg_interfaces)
5576 frags = self.pg0.get_capture(len(pkts))
5577 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5578 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5579 self.assertEqual(p[TCP].sport, 20)
5580 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5581 self.assertEqual(data, p[Raw].load)
5583 reass = self.vapi.nat_reass_dump()
5584 reass_n_end = len(reass)
5586 self.assertEqual(reass_n_end - reass_n_start, 2)
5588 def test_reass_hairpinning(self):
5589 """ NAT64 fragments hairpinning """
5591 client = self.pg0.remote_hosts[0]
5592 server = self.pg0.remote_hosts[1]
5593 server_in_port = random.randint(1025, 65535)
5594 server_out_port = random.randint(1025, 65535)
5595 client_in_port = random.randint(1025, 65535)
5596 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5597 nat_addr_ip6 = ip.src
5599 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5601 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5602 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5604 # add static BIB entry for server
5605 self.vapi.nat64_add_del_static_bib(server.ip6n,
5611 # send packet from host to server
5612 pkts = self.create_stream_frag_ip6(self.pg0,
5617 self.pg0.add_stream(pkts)
5618 self.pg_enable_capture(self.pg_interfaces)
5620 frags = self.pg0.get_capture(len(pkts))
5621 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5622 self.assertNotEqual(p[TCP].sport, client_in_port)
5623 self.assertEqual(p[TCP].dport, server_in_port)
5624 self.assertEqual(data, p[Raw].load)
5626 def test_frag_out_of_order(self):
5627 """ NAT64 translate fragments arriving out of order """
5628 self.tcp_port_in = random.randint(1025, 65535)
5630 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5632 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5633 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5637 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5638 self.tcp_port_in, 20, data)
5640 self.pg0.add_stream(pkts)
5641 self.pg_enable_capture(self.pg_interfaces)
5643 frags = self.pg1.get_capture(len(pkts))
5644 p = self.reass_frags_and_verify(frags,
5646 self.pg1.remote_ip4)
5647 self.assertEqual(p[TCP].dport, 20)
5648 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5649 self.tcp_port_out = p[TCP].sport
5650 self.assertEqual(data, p[Raw].load)
5653 data = "A" * 4 + "B" * 16 + "C" * 3
5654 pkts = self.create_stream_frag(self.pg1,
5660 self.pg1.add_stream(pkts)
5661 self.pg_enable_capture(self.pg_interfaces)
5663 frags = self.pg0.get_capture(len(pkts))
5664 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5665 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5666 self.assertEqual(p[TCP].sport, 20)
5667 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5668 self.assertEqual(data, p[Raw].load)
5670 def test_interface_addr(self):
5671 """ Acquire NAT64 pool addresses from interface """
5672 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5674 # no address in NAT64 pool
5675 adresses = self.vapi.nat44_address_dump()
5676 self.assertEqual(0, len(adresses))
5678 # configure interface address and check NAT64 address pool
5679 self.pg4.config_ip4()
5680 addresses = self.vapi.nat64_pool_addr_dump()
5681 self.assertEqual(len(addresses), 1)
5682 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5684 # remove interface address and check NAT64 address pool
5685 self.pg4.unconfig_ip4()
5686 addresses = self.vapi.nat64_pool_addr_dump()
5687 self.assertEqual(0, len(adresses))
5689 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5690 def test_ipfix_max_bibs_sessions(self):
5691 """ IPFIX logging maximum session and BIB entries exceeded """
5694 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5698 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5700 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5701 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5705 for i in range(0, max_bibs):
5706 src = "fd01:aa::%x" % (i)
5707 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5708 IPv6(src=src, dst=remote_host_ip6) /
5709 TCP(sport=12345, dport=80))
5711 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5712 IPv6(src=src, dst=remote_host_ip6) /
5713 TCP(sport=12345, dport=22))
5715 self.pg0.add_stream(pkts)
5716 self.pg_enable_capture(self.pg_interfaces)
5718 self.pg1.get_capture(max_sessions)
5720 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5721 src_address=self.pg3.local_ip4n,
5723 template_interval=10)
5724 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5725 src_port=self.ipfix_src_port)
5727 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5728 IPv6(src=src, dst=remote_host_ip6) /
5729 TCP(sport=12345, dport=25))
5730 self.pg0.add_stream(p)
5731 self.pg_enable_capture(self.pg_interfaces)
5733 self.pg1.get_capture(0)
5734 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5735 capture = self.pg3.get_capture(9)
5736 ipfix = IPFIXDecoder()
5737 # first load template
5739 self.assertTrue(p.haslayer(IPFIX))
5740 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5741 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5742 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5743 self.assertEqual(p[UDP].dport, 4739)
5744 self.assertEqual(p[IPFIX].observationDomainID,
5745 self.ipfix_domain_id)
5746 if p.haslayer(Template):
5747 ipfix.add_template(p.getlayer(Template))
5748 # verify events in data set
5750 if p.haslayer(Data):
5751 data = ipfix.decode_data_set(p.getlayer(Set))
5752 self.verify_ipfix_max_sessions(data, max_sessions)
5754 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5755 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5756 TCP(sport=12345, dport=80))
5757 self.pg0.add_stream(p)
5758 self.pg_enable_capture(self.pg_interfaces)
5760 self.pg1.get_capture(0)
5761 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5762 capture = self.pg3.get_capture(1)
5763 # verify events in data set
5765 self.assertTrue(p.haslayer(IPFIX))
5766 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5767 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5768 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5769 self.assertEqual(p[UDP].dport, 4739)
5770 self.assertEqual(p[IPFIX].observationDomainID,
5771 self.ipfix_domain_id)
5772 if p.haslayer(Data):
5773 data = ipfix.decode_data_set(p.getlayer(Set))
5774 self.verify_ipfix_max_bibs(data, max_bibs)
5776 def test_ipfix_max_frags(self):
5777 """ IPFIX logging maximum fragments pending reassembly exceeded """
5778 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5780 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5781 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5782 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5783 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5784 src_address=self.pg3.local_ip4n,
5786 template_interval=10)
5787 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5788 src_port=self.ipfix_src_port)
5791 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5792 self.tcp_port_in, 20, data)
5793 self.pg0.add_stream(pkts[-1])
5794 self.pg_enable_capture(self.pg_interfaces)
5796 self.pg1.get_capture(0)
5797 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5798 capture = self.pg3.get_capture(9)
5799 ipfix = IPFIXDecoder()
5800 # first load template
5802 self.assertTrue(p.haslayer(IPFIX))
5803 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5804 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5805 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5806 self.assertEqual(p[UDP].dport, 4739)
5807 self.assertEqual(p[IPFIX].observationDomainID,
5808 self.ipfix_domain_id)
5809 if p.haslayer(Template):
5810 ipfix.add_template(p.getlayer(Template))
5811 # verify events in data set
5813 if p.haslayer(Data):
5814 data = ipfix.decode_data_set(p.getlayer(Set))
5815 self.verify_ipfix_max_fragments_ip6(data, 0,
5816 self.pg0.remote_ip6n)
5818 def test_ipfix_bib_ses(self):
5819 """ IPFIX logging NAT64 BIB/session create and delete events """
5820 self.tcp_port_in = random.randint(1025, 65535)
5821 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5825 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5827 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5828 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5829 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5830 src_address=self.pg3.local_ip4n,
5832 template_interval=10)
5833 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5834 src_port=self.ipfix_src_port)
5837 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5838 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5839 TCP(sport=self.tcp_port_in, dport=25))
5840 self.pg0.add_stream(p)
5841 self.pg_enable_capture(self.pg_interfaces)
5843 p = self.pg1.get_capture(1)
5844 self.tcp_port_out = p[0][TCP].sport
5845 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5846 capture = self.pg3.get_capture(10)
5847 ipfix = IPFIXDecoder()
5848 # first load template
5850 self.assertTrue(p.haslayer(IPFIX))
5851 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5852 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5853 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5854 self.assertEqual(p[UDP].dport, 4739)
5855 self.assertEqual(p[IPFIX].observationDomainID,
5856 self.ipfix_domain_id)
5857 if p.haslayer(Template):
5858 ipfix.add_template(p.getlayer(Template))
5859 # verify events in data set
5861 if p.haslayer(Data):
5862 data = ipfix.decode_data_set(p.getlayer(Set))
5863 if ord(data[0][230]) == 10:
5864 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5865 elif ord(data[0][230]) == 6:
5866 self.verify_ipfix_nat64_ses(data,
5868 self.pg0.remote_ip6n,
5869 self.pg1.remote_ip4,
5872 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5875 self.pg_enable_capture(self.pg_interfaces)
5876 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5879 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5880 capture = self.pg3.get_capture(2)
5881 # verify events in data set
5883 self.assertTrue(p.haslayer(IPFIX))
5884 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5885 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5886 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5887 self.assertEqual(p[UDP].dport, 4739)
5888 self.assertEqual(p[IPFIX].observationDomainID,
5889 self.ipfix_domain_id)
5890 if p.haslayer(Data):
5891 data = ipfix.decode_data_set(p.getlayer(Set))
5892 if ord(data[0][230]) == 11:
5893 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5894 elif ord(data[0][230]) == 7:
5895 self.verify_ipfix_nat64_ses(data,
5897 self.pg0.remote_ip6n,
5898 self.pg1.remote_ip4,
5901 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5903 def nat64_get_ses_num(self):
5905 Return number of active NAT64 sessions.
5907 st = self.vapi.nat64_st_dump()
5910 def clear_nat64(self):
5912 Clear NAT64 configuration.
5914 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5915 domain_id=self.ipfix_domain_id)
5916 self.ipfix_src_port = 4739
5917 self.ipfix_domain_id = 1
5919 self.vapi.nat64_set_timeouts()
5921 interfaces = self.vapi.nat64_interface_dump()
5922 for intf in interfaces:
5923 if intf.is_inside > 1:
5924 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5927 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5931 bib = self.vapi.nat64_bib_dump(255)
5934 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5942 adresses = self.vapi.nat64_pool_addr_dump()
5943 for addr in adresses:
5944 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5949 prefixes = self.vapi.nat64_prefix_dump()
5950 for prefix in prefixes:
5951 self.vapi.nat64_add_del_prefix(prefix.prefix,
5953 vrf_id=prefix.vrf_id,
5957 super(TestNAT64, self).tearDown()
5958 if not self.vpp_dead:
5959 self.logger.info(self.vapi.cli("show nat64 pool"))
5960 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5961 self.logger.info(self.vapi.cli("show nat64 prefix"))
5962 self.logger.info(self.vapi.cli("show nat64 bib all"))
5963 self.logger.info(self.vapi.cli("show nat64 session table all"))
5964 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5968 class TestDSlite(MethodHolder):
5969 """ DS-Lite Test Cases """
5972 def setUpClass(cls):
5973 super(TestDSlite, cls).setUpClass()
5976 cls.nat_addr = '10.0.0.3'
5977 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5979 cls.create_pg_interfaces(range(2))
5981 cls.pg0.config_ip4()
5982 cls.pg0.resolve_arp()
5984 cls.pg1.config_ip6()
5985 cls.pg1.generate_remote_hosts(2)
5986 cls.pg1.configure_ipv6_neighbors()
5989 super(TestDSlite, cls).tearDownClass()
5992 def test_dslite(self):
5993 """ Test DS-Lite """
5994 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5996 aftr_ip4 = '192.0.0.1'
5997 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5998 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5999 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6000 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6003 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6004 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6005 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6006 UDP(sport=20000, dport=10000))
6007 self.pg1.add_stream(p)
6008 self.pg_enable_capture(self.pg_interfaces)
6010 capture = self.pg0.get_capture(1)
6011 capture = capture[0]
6012 self.assertFalse(capture.haslayer(IPv6))
6013 self.assertEqual(capture[IP].src, self.nat_addr)
6014 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6015 self.assertNotEqual(capture[UDP].sport, 20000)
6016 self.assertEqual(capture[UDP].dport, 10000)
6017 self.check_ip_checksum(capture)
6018 out_port = capture[UDP].sport
6020 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6021 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6022 UDP(sport=10000, dport=out_port))
6023 self.pg0.add_stream(p)
6024 self.pg_enable_capture(self.pg_interfaces)
6026 capture = self.pg1.get_capture(1)
6027 capture = capture[0]
6028 self.assertEqual(capture[IPv6].src, aftr_ip6)
6029 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6030 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6031 self.assertEqual(capture[IP].dst, '192.168.1.1')
6032 self.assertEqual(capture[UDP].sport, 10000)
6033 self.assertEqual(capture[UDP].dport, 20000)
6034 self.check_ip_checksum(capture)
6037 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6038 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6039 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6040 TCP(sport=20001, dport=10001))
6041 self.pg1.add_stream(p)
6042 self.pg_enable_capture(self.pg_interfaces)
6044 capture = self.pg0.get_capture(1)
6045 capture = capture[0]
6046 self.assertFalse(capture.haslayer(IPv6))
6047 self.assertEqual(capture[IP].src, self.nat_addr)
6048 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6049 self.assertNotEqual(capture[TCP].sport, 20001)
6050 self.assertEqual(capture[TCP].dport, 10001)
6051 self.check_ip_checksum(capture)
6052 self.check_tcp_checksum(capture)
6053 out_port = capture[TCP].sport
6055 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6056 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6057 TCP(sport=10001, dport=out_port))
6058 self.pg0.add_stream(p)
6059 self.pg_enable_capture(self.pg_interfaces)
6061 capture = self.pg1.get_capture(1)
6062 capture = capture[0]
6063 self.assertEqual(capture[IPv6].src, aftr_ip6)
6064 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6065 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6066 self.assertEqual(capture[IP].dst, '192.168.1.1')
6067 self.assertEqual(capture[TCP].sport, 10001)
6068 self.assertEqual(capture[TCP].dport, 20001)
6069 self.check_ip_checksum(capture)
6070 self.check_tcp_checksum(capture)
6073 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6074 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6075 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6076 ICMP(id=4000, type='echo-request'))
6077 self.pg1.add_stream(p)
6078 self.pg_enable_capture(self.pg_interfaces)
6080 capture = self.pg0.get_capture(1)
6081 capture = capture[0]
6082 self.assertFalse(capture.haslayer(IPv6))
6083 self.assertEqual(capture[IP].src, self.nat_addr)
6084 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6085 self.assertNotEqual(capture[ICMP].id, 4000)
6086 self.check_ip_checksum(capture)
6087 self.check_icmp_checksum(capture)
6088 out_id = capture[ICMP].id
6090 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6091 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6092 ICMP(id=out_id, type='echo-reply'))
6093 self.pg0.add_stream(p)
6094 self.pg_enable_capture(self.pg_interfaces)
6096 capture = self.pg1.get_capture(1)
6097 capture = capture[0]
6098 self.assertEqual(capture[IPv6].src, aftr_ip6)
6099 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6100 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6101 self.assertEqual(capture[IP].dst, '192.168.1.1')
6102 self.assertEqual(capture[ICMP].id, 4000)
6103 self.check_ip_checksum(capture)
6104 self.check_icmp_checksum(capture)
6106 # ping DS-Lite AFTR tunnel endpoint address
6107 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6108 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6109 ICMPv6EchoRequest())
6110 self.pg1.add_stream(p)
6111 self.pg_enable_capture(self.pg_interfaces)
6113 capture = self.pg1.get_capture(1)
6114 self.assertEqual(1, len(capture))
6115 capture = capture[0]
6116 self.assertEqual(capture[IPv6].src, aftr_ip6)
6117 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6118 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6121 super(TestDSlite, self).tearDown()
6122 if not self.vpp_dead:
6123 self.logger.info(self.vapi.cli("show dslite pool"))
6125 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6126 self.logger.info(self.vapi.cli("show dslite sessions"))
6129 class TestDSliteCE(MethodHolder):
6130 """ DS-Lite CE Test Cases """
6133 def setUpConstants(cls):
6134 super(TestDSliteCE, cls).setUpConstants()
6135 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6138 def setUpClass(cls):
6139 super(TestDSliteCE, cls).setUpClass()
6142 cls.create_pg_interfaces(range(2))
6144 cls.pg0.config_ip4()
6145 cls.pg0.resolve_arp()
6147 cls.pg1.config_ip6()
6148 cls.pg1.generate_remote_hosts(1)
6149 cls.pg1.configure_ipv6_neighbors()
6152 super(TestDSliteCE, cls).tearDownClass()
6155 def test_dslite_ce(self):
6156 """ Test DS-Lite CE """
6158 b4_ip4 = '192.0.0.2'
6159 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6160 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6161 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6162 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6164 aftr_ip4 = '192.0.0.1'
6165 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6166 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6167 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6168 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6170 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6171 dst_address_length=128,
6172 next_hop_address=self.pg1.remote_ip6n,
6173 next_hop_sw_if_index=self.pg1.sw_if_index,
6177 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6178 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6179 UDP(sport=10000, dport=20000))
6180 self.pg0.add_stream(p)
6181 self.pg_enable_capture(self.pg_interfaces)
6183 capture = self.pg1.get_capture(1)
6184 capture = capture[0]
6185 self.assertEqual(capture[IPv6].src, b4_ip6)
6186 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6187 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6188 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6189 self.assertEqual(capture[UDP].sport, 10000)
6190 self.assertEqual(capture[UDP].dport, 20000)
6191 self.check_ip_checksum(capture)
6194 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6195 IPv6(dst=b4_ip6, src=aftr_ip6) /
6196 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6197 UDP(sport=20000, dport=10000))
6198 self.pg1.add_stream(p)
6199 self.pg_enable_capture(self.pg_interfaces)
6201 capture = self.pg0.get_capture(1)
6202 capture = capture[0]
6203 self.assertFalse(capture.haslayer(IPv6))
6204 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6205 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6206 self.assertEqual(capture[UDP].sport, 20000)
6207 self.assertEqual(capture[UDP].dport, 10000)
6208 self.check_ip_checksum(capture)
6210 # ping DS-Lite B4 tunnel endpoint address
6211 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6212 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6213 ICMPv6EchoRequest())
6214 self.pg1.add_stream(p)
6215 self.pg_enable_capture(self.pg_interfaces)
6217 capture = self.pg1.get_capture(1)
6218 self.assertEqual(1, len(capture))
6219 capture = capture[0]
6220 self.assertEqual(capture[IPv6].src, b4_ip6)
6221 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6222 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6225 super(TestDSliteCE, self).tearDown()
6226 if not self.vpp_dead:
6228 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6230 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6233 class TestNAT66(MethodHolder):
6234 """ NAT66 Test Cases """
6237 def setUpClass(cls):
6238 super(TestNAT66, cls).setUpClass()
6241 cls.nat_addr = 'fd01:ff::2'
6242 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6244 cls.create_pg_interfaces(range(2))
6245 cls.interfaces = list(cls.pg_interfaces)
6247 for i in cls.interfaces:
6250 i.configure_ipv6_neighbors()
6253 super(TestNAT66, cls).tearDownClass()
6256 def test_static(self):
6257 """ 1:1 NAT66 test """
6258 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6259 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6260 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6266 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6270 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6274 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6275 ICMPv6EchoRequest())
6277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6278 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6279 GRE() / IP() / TCP())
6281 self.pg0.add_stream(pkts)
6282 self.pg_enable_capture(self.pg_interfaces)
6284 capture = self.pg1.get_capture(len(pkts))
6285 for packet in capture:
6287 self.assertEqual(packet[IPv6].src, self.nat_addr)
6288 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6289 if packet.haslayer(TCP):
6290 self.check_tcp_checksum(packet)
6291 elif packet.haslayer(UDP):
6292 self.check_udp_checksum(packet)
6293 elif packet.haslayer(ICMPv6EchoRequest):
6294 self.check_icmpv6_checksum(packet)
6296 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6301 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6302 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6305 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6306 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6309 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6310 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6313 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6314 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6315 GRE() / IP() / TCP())
6317 self.pg1.add_stream(pkts)
6318 self.pg_enable_capture(self.pg_interfaces)
6320 capture = self.pg0.get_capture(len(pkts))
6321 for packet in capture:
6323 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6324 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6325 if packet.haslayer(TCP):
6326 self.check_tcp_checksum(packet)
6327 elif packet.haslayer(UDP):
6328 self.check_udp_checksum(packet)
6329 elif packet.haslayer(ICMPv6EchoReply):
6330 self.check_icmpv6_checksum(packet)
6332 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6335 sm = self.vapi.nat66_static_mapping_dump()
6336 self.assertEqual(len(sm), 1)
6337 self.assertEqual(sm[0].total_pkts, 8)
6339 def clear_nat66(self):
6341 Clear NAT66 configuration.
6343 interfaces = self.vapi.nat66_interface_dump()
6344 for intf in interfaces:
6345 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6349 static_mappings = self.vapi.nat66_static_mapping_dump()
6350 for sm in static_mappings:
6351 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6352 sm.external_ip_address,
6357 super(TestNAT66, self).tearDown()
6358 if not self.vpp_dead:
6359 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6360 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6363 if __name__ == '__main__':
6364 unittest.main(testRunner=VppTestRunner)