9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_dynamic_to_static(self):
1680 """ Switch from dynamic translation to 1:1NAT """
1681 nat_ip = "10.0.0.10"
1682 self.tcp_port_out = 6303
1683 self.udp_port_out = 6304
1684 self.icmp_id_out = 6305
1686 self.nat44_add_address(self.nat_addr)
1687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692 pkts = self.create_stream_in(self.pg0, self.pg1)
1693 self.pg0.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg1.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1700 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702 self.assertEqual(len(sessions), 0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_identity_nat(self):
1711 """ Identity NAT """
1713 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1718 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720 TCP(sport=12345, dport=56789))
1721 self.pg1.add_stream(p)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg0.get_capture(1)
1729 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730 self.assertEqual(ip.src, self.pg1.remote_ip4)
1731 self.assertEqual(tcp.dport, 56789)
1732 self.assertEqual(tcp.sport, 12345)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1739 def test_static_lb(self):
1740 """ NAT44 local service load balancing """
1741 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1744 server1 = self.pg0.remote_hosts[0]
1745 server2 = self.pg0.remote_hosts[1]
1747 locals = [{'addr': server1.ip4n,
1750 {'addr': server2.ip4n,
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1758 local_num=len(locals),
1760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1764 # from client to service
1765 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767 TCP(sport=12345, dport=external_port))
1768 self.pg1.add_stream(p)
1769 self.pg_enable_capture(self.pg_interfaces)
1771 capture = self.pg0.get_capture(1)
1777 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778 if ip.dst == server1.ip4:
1782 self.assertEqual(tcp.dport, local_port)
1783 self.check_tcp_checksum(p)
1784 self.check_ip_checksum(p)
1786 self.logger.error(ppp("Unexpected or invalid packet:", p))
1789 # from service back to client
1790 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792 TCP(sport=local_port, dport=12345))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg1.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(tcp.sport, external_port)
1803 self.check_tcp_checksum(p)
1804 self.check_ip_checksum(p)
1806 self.logger.error(ppp("Unexpected or invalid packet:", p))
1809 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1810 def test_static_lb_multi_clients(self):
1811 """ NAT44 local service load balancing - multiple clients"""
1813 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1816 server1 = self.pg0.remote_hosts[0]
1817 server2 = self.pg0.remote_hosts[1]
1819 locals = [{'addr': server1.ip4n,
1822 {'addr': server2.ip4n,
1826 self.nat44_add_address(self.nat_addr)
1827 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1830 local_num=len(locals),
1832 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1833 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1838 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1840 for client in clients:
1841 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1842 IP(src=client, dst=self.nat_addr) /
1843 TCP(sport=12345, dport=external_port))
1845 self.pg1.add_stream(pkts)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg0.get_capture(len(pkts))
1850 if p[IP].dst == server1.ip4:
1854 self.assertTrue(server1_n > server2_n)
1856 def test_static_lb_2(self):
1857 """ NAT44 local service load balancing (asymmetrical rule) """
1858 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1861 server1 = self.pg0.remote_hosts[0]
1862 server2 = self.pg0.remote_hosts[1]
1864 locals = [{'addr': server1.ip4n,
1867 {'addr': server2.ip4n,
1871 self.vapi.nat44_forwarding_enable_disable(1)
1872 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1876 local_num=len(locals),
1878 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1882 # from client to service
1883 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1884 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1885 TCP(sport=12345, dport=external_port))
1886 self.pg1.add_stream(p)
1887 self.pg_enable_capture(self.pg_interfaces)
1889 capture = self.pg0.get_capture(1)
1895 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1896 if ip.dst == server1.ip4:
1900 self.assertEqual(tcp.dport, local_port)
1901 self.check_tcp_checksum(p)
1902 self.check_ip_checksum(p)
1904 self.logger.error(ppp("Unexpected or invalid packet:", p))
1907 # from service back to client
1908 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1909 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1910 TCP(sport=local_port, dport=12345))
1911 self.pg0.add_stream(p)
1912 self.pg_enable_capture(self.pg_interfaces)
1914 capture = self.pg1.get_capture(1)
1919 self.assertEqual(ip.src, self.nat_addr)
1920 self.assertEqual(tcp.sport, external_port)
1921 self.check_tcp_checksum(p)
1922 self.check_ip_checksum(p)
1924 self.logger.error(ppp("Unexpected or invalid packet:", p))
1927 # from client to server (no translation)
1928 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1929 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1930 TCP(sport=12346, dport=local_port))
1931 self.pg1.add_stream(p)
1932 self.pg_enable_capture(self.pg_interfaces)
1934 capture = self.pg0.get_capture(1)
1940 self.assertEqual(ip.dst, server1.ip4)
1941 self.assertEqual(tcp.dport, local_port)
1942 self.check_tcp_checksum(p)
1943 self.check_ip_checksum(p)
1945 self.logger.error(ppp("Unexpected or invalid packet:", p))
1948 # from service back to client (no translation)
1949 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1950 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1951 TCP(sport=local_port, dport=12346))
1952 self.pg0.add_stream(p)
1953 self.pg_enable_capture(self.pg_interfaces)
1955 capture = self.pg1.get_capture(1)
1960 self.assertEqual(ip.src, server1.ip4)
1961 self.assertEqual(tcp.sport, local_port)
1962 self.check_tcp_checksum(p)
1963 self.check_ip_checksum(p)
1965 self.logger.error(ppp("Unexpected or invalid packet:", p))
1968 def test_multiple_inside_interfaces(self):
1969 """ NAT44 multiple non-overlapping address space inside interfaces """
1971 self.nat44_add_address(self.nat_addr)
1972 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1973 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1974 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1977 # between two NAT44 inside interfaces (no translation)
1978 pkts = self.create_stream_in(self.pg0, self.pg1)
1979 self.pg0.add_stream(pkts)
1980 self.pg_enable_capture(self.pg_interfaces)
1982 capture = self.pg1.get_capture(len(pkts))
1983 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1985 # from NAT44 inside to interface without NAT44 feature (no translation)
1986 pkts = self.create_stream_in(self.pg0, self.pg2)
1987 self.pg0.add_stream(pkts)
1988 self.pg_enable_capture(self.pg_interfaces)
1990 capture = self.pg2.get_capture(len(pkts))
1991 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1993 # in2out 1st interface
1994 pkts = self.create_stream_in(self.pg0, self.pg3)
1995 self.pg0.add_stream(pkts)
1996 self.pg_enable_capture(self.pg_interfaces)
1998 capture = self.pg3.get_capture(len(pkts))
1999 self.verify_capture_out(capture)
2001 # out2in 1st interface
2002 pkts = self.create_stream_out(self.pg3)
2003 self.pg3.add_stream(pkts)
2004 self.pg_enable_capture(self.pg_interfaces)
2006 capture = self.pg0.get_capture(len(pkts))
2007 self.verify_capture_in(capture, self.pg0)
2009 # in2out 2nd interface
2010 pkts = self.create_stream_in(self.pg1, self.pg3)
2011 self.pg1.add_stream(pkts)
2012 self.pg_enable_capture(self.pg_interfaces)
2014 capture = self.pg3.get_capture(len(pkts))
2015 self.verify_capture_out(capture)
2017 # out2in 2nd interface
2018 pkts = self.create_stream_out(self.pg3)
2019 self.pg3.add_stream(pkts)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 capture = self.pg1.get_capture(len(pkts))
2023 self.verify_capture_in(capture, self.pg1)
2025 def test_inside_overlapping_interfaces(self):
2026 """ NAT44 multiple inside interfaces with overlapping address space """
2028 static_nat_ip = "10.0.0.10"
2029 self.nat44_add_address(self.nat_addr)
2030 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2032 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2033 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2035 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2038 # between NAT44 inside interfaces with same VRF (no translation)
2039 pkts = self.create_stream_in(self.pg4, self.pg5)
2040 self.pg4.add_stream(pkts)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 capture = self.pg5.get_capture(len(pkts))
2044 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2046 # between NAT44 inside interfaces with different VRF (hairpinning)
2047 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2048 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2049 TCP(sport=1234, dport=5678))
2050 self.pg4.add_stream(p)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 capture = self.pg6.get_capture(1)
2058 self.assertEqual(ip.src, self.nat_addr)
2059 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2060 self.assertNotEqual(tcp.sport, 1234)
2061 self.assertEqual(tcp.dport, 5678)
2063 self.logger.error(ppp("Unexpected or invalid packet:", p))
2066 # in2out 1st interface
2067 pkts = self.create_stream_in(self.pg4, self.pg3)
2068 self.pg4.add_stream(pkts)
2069 self.pg_enable_capture(self.pg_interfaces)
2071 capture = self.pg3.get_capture(len(pkts))
2072 self.verify_capture_out(capture)
2074 # out2in 1st interface
2075 pkts = self.create_stream_out(self.pg3)
2076 self.pg3.add_stream(pkts)
2077 self.pg_enable_capture(self.pg_interfaces)
2079 capture = self.pg4.get_capture(len(pkts))
2080 self.verify_capture_in(capture, self.pg4)
2082 # in2out 2nd interface
2083 pkts = self.create_stream_in(self.pg5, self.pg3)
2084 self.pg5.add_stream(pkts)
2085 self.pg_enable_capture(self.pg_interfaces)
2087 capture = self.pg3.get_capture(len(pkts))
2088 self.verify_capture_out(capture)
2090 # out2in 2nd interface
2091 pkts = self.create_stream_out(self.pg3)
2092 self.pg3.add_stream(pkts)
2093 self.pg_enable_capture(self.pg_interfaces)
2095 capture = self.pg5.get_capture(len(pkts))
2096 self.verify_capture_in(capture, self.pg5)
2099 addresses = self.vapi.nat44_address_dump()
2100 self.assertEqual(len(addresses), 1)
2101 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2102 self.assertEqual(len(sessions), 3)
2103 for session in sessions:
2104 self.assertFalse(session.is_static)
2105 self.assertEqual(session.inside_ip_address[0:4],
2106 self.pg5.remote_ip4n)
2107 self.assertEqual(session.outside_ip_address,
2108 addresses[0].ip_address)
2109 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2110 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2111 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2112 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2113 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2114 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2115 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2116 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2117 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2119 # in2out 3rd interface
2120 pkts = self.create_stream_in(self.pg6, self.pg3)
2121 self.pg6.add_stream(pkts)
2122 self.pg_enable_capture(self.pg_interfaces)
2124 capture = self.pg3.get_capture(len(pkts))
2125 self.verify_capture_out(capture, static_nat_ip, True)
2127 # out2in 3rd interface
2128 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2129 self.pg3.add_stream(pkts)
2130 self.pg_enable_capture(self.pg_interfaces)
2132 capture = self.pg6.get_capture(len(pkts))
2133 self.verify_capture_in(capture, self.pg6)
2135 # general user and session dump verifications
2136 users = self.vapi.nat44_user_dump()
2137 self.assertTrue(len(users) >= 3)
2138 addresses = self.vapi.nat44_address_dump()
2139 self.assertEqual(len(addresses), 1)
2141 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2143 for session in sessions:
2144 self.assertEqual(user.ip_address, session.inside_ip_address)
2145 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2146 self.assertTrue(session.protocol in
2147 [IP_PROTOS.tcp, IP_PROTOS.udp,
2151 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2152 self.assertTrue(len(sessions) >= 4)
2153 for session in sessions:
2154 self.assertFalse(session.is_static)
2155 self.assertEqual(session.inside_ip_address[0:4],
2156 self.pg4.remote_ip4n)
2157 self.assertEqual(session.outside_ip_address,
2158 addresses[0].ip_address)
2161 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2162 self.assertTrue(len(sessions) >= 3)
2163 for session in sessions:
2164 self.assertTrue(session.is_static)
2165 self.assertEqual(session.inside_ip_address[0:4],
2166 self.pg6.remote_ip4n)
2167 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2168 map(int, static_nat_ip.split('.')))
2169 self.assertTrue(session.inside_port in
2170 [self.tcp_port_in, self.udp_port_in,
2173 def test_hairpinning(self):
2174 """ NAT44 hairpinning - 1:1 NAPT """
2176 host = self.pg0.remote_hosts[0]
2177 server = self.pg0.remote_hosts[1]
2180 server_in_port = 5678
2181 server_out_port = 8765
2183 self.nat44_add_address(self.nat_addr)
2184 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2185 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2187 # add static mapping for server
2188 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2189 server_in_port, server_out_port,
2190 proto=IP_PROTOS.tcp)
2192 # send packet from host to server
2193 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2194 IP(src=host.ip4, dst=self.nat_addr) /
2195 TCP(sport=host_in_port, dport=server_out_port))
2196 self.pg0.add_stream(p)
2197 self.pg_enable_capture(self.pg_interfaces)
2199 capture = self.pg0.get_capture(1)
2204 self.assertEqual(ip.src, self.nat_addr)
2205 self.assertEqual(ip.dst, server.ip4)
2206 self.assertNotEqual(tcp.sport, host_in_port)
2207 self.assertEqual(tcp.dport, server_in_port)
2208 self.check_tcp_checksum(p)
2209 host_out_port = tcp.sport
2211 self.logger.error(ppp("Unexpected or invalid packet:", p))
2214 # send reply from server to host
2215 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2216 IP(src=server.ip4, dst=self.nat_addr) /
2217 TCP(sport=server_in_port, dport=host_out_port))
2218 self.pg0.add_stream(p)
2219 self.pg_enable_capture(self.pg_interfaces)
2221 capture = self.pg0.get_capture(1)
2226 self.assertEqual(ip.src, self.nat_addr)
2227 self.assertEqual(ip.dst, host.ip4)
2228 self.assertEqual(tcp.sport, server_out_port)
2229 self.assertEqual(tcp.dport, host_in_port)
2230 self.check_tcp_checksum(p)
2232 self.logger.error(ppp("Unexpected or invalid packet:", p))
2235 def test_hairpinning2(self):
2236 """ NAT44 hairpinning - 1:1 NAT"""
2238 server1_nat_ip = "10.0.0.10"
2239 server2_nat_ip = "10.0.0.11"
2240 host = self.pg0.remote_hosts[0]
2241 server1 = self.pg0.remote_hosts[1]
2242 server2 = self.pg0.remote_hosts[2]
2243 server_tcp_port = 22
2244 server_udp_port = 20
2246 self.nat44_add_address(self.nat_addr)
2247 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2248 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2251 # add static mapping for servers
2252 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2253 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=host.ip4, dst=server1_nat_ip) /
2259 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=host.ip4, dst=server1_nat_ip) /
2263 UDP(sport=self.udp_port_in, dport=server_udp_port))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=host.ip4, dst=server1_nat_ip) /
2267 ICMP(id=self.icmp_id_in, type='echo-request'))
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2272 capture = self.pg0.get_capture(len(pkts))
2273 for packet in capture:
2275 self.assertEqual(packet[IP].src, self.nat_addr)
2276 self.assertEqual(packet[IP].dst, server1.ip4)
2277 if packet.haslayer(TCP):
2278 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2279 self.assertEqual(packet[TCP].dport, server_tcp_port)
2280 self.tcp_port_out = packet[TCP].sport
2281 self.check_tcp_checksum(packet)
2282 elif packet.haslayer(UDP):
2283 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2284 self.assertEqual(packet[UDP].dport, server_udp_port)
2285 self.udp_port_out = packet[UDP].sport
2287 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2288 self.icmp_id_out = packet[ICMP].id
2290 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2295 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2296 IP(src=server1.ip4, dst=self.nat_addr) /
2297 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2300 IP(src=server1.ip4, dst=self.nat_addr) /
2301 UDP(sport=server_udp_port, dport=self.udp_port_out))
2303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2304 IP(src=server1.ip4, dst=self.nat_addr) /
2305 ICMP(id=self.icmp_id_out, type='echo-reply'))
2307 self.pg0.add_stream(pkts)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg0.get_capture(len(pkts))
2311 for packet in capture:
2313 self.assertEqual(packet[IP].src, server1_nat_ip)
2314 self.assertEqual(packet[IP].dst, host.ip4)
2315 if packet.haslayer(TCP):
2316 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2317 self.assertEqual(packet[TCP].sport, server_tcp_port)
2318 self.check_tcp_checksum(packet)
2319 elif packet.haslayer(UDP):
2320 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2321 self.assertEqual(packet[UDP].sport, server_udp_port)
2323 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2325 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2328 # server2 to server1
2330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2331 IP(src=server2.ip4, dst=server1_nat_ip) /
2332 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2334 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2335 IP(src=server2.ip4, dst=server1_nat_ip) /
2336 UDP(sport=self.udp_port_in, dport=server_udp_port))
2338 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2339 IP(src=server2.ip4, dst=server1_nat_ip) /
2340 ICMP(id=self.icmp_id_in, type='echo-request'))
2342 self.pg0.add_stream(pkts)
2343 self.pg_enable_capture(self.pg_interfaces)
2345 capture = self.pg0.get_capture(len(pkts))
2346 for packet in capture:
2348 self.assertEqual(packet[IP].src, server2_nat_ip)
2349 self.assertEqual(packet[IP].dst, server1.ip4)
2350 if packet.haslayer(TCP):
2351 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2352 self.assertEqual(packet[TCP].dport, server_tcp_port)
2353 self.tcp_port_out = packet[TCP].sport
2354 self.check_tcp_checksum(packet)
2355 elif packet.haslayer(UDP):
2356 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2357 self.assertEqual(packet[UDP].dport, server_udp_port)
2358 self.udp_port_out = packet[UDP].sport
2360 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2361 self.icmp_id_out = packet[ICMP].id
2363 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2366 # server1 to server2
2368 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2369 IP(src=server1.ip4, dst=server2_nat_ip) /
2370 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2372 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2373 IP(src=server1.ip4, dst=server2_nat_ip) /
2374 UDP(sport=server_udp_port, dport=self.udp_port_out))
2376 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2377 IP(src=server1.ip4, dst=server2_nat_ip) /
2378 ICMP(id=self.icmp_id_out, type='echo-reply'))
2380 self.pg0.add_stream(pkts)
2381 self.pg_enable_capture(self.pg_interfaces)
2383 capture = self.pg0.get_capture(len(pkts))
2384 for packet in capture:
2386 self.assertEqual(packet[IP].src, server1_nat_ip)
2387 self.assertEqual(packet[IP].dst, server2.ip4)
2388 if packet.haslayer(TCP):
2389 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2390 self.assertEqual(packet[TCP].sport, server_tcp_port)
2391 self.check_tcp_checksum(packet)
2392 elif packet.haslayer(UDP):
2393 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2394 self.assertEqual(packet[UDP].sport, server_udp_port)
2396 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2398 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2401 def test_max_translations_per_user(self):
2402 """ MAX translations per user - recycle the least recently used """
2404 self.nat44_add_address(self.nat_addr)
2405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2409 # get maximum number of translations per user
2410 nat44_config = self.vapi.nat_show_config()
2412 # send more than maximum number of translations per user packets
2413 pkts_num = nat44_config.max_translations_per_user + 5
2415 for port in range(0, pkts_num):
2416 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2417 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2418 TCP(sport=1025 + port))
2420 self.pg0.add_stream(pkts)
2421 self.pg_enable_capture(self.pg_interfaces)
2424 # verify number of translated packet
2425 self.pg1.get_capture(pkts_num)
2427 def test_interface_addr(self):
2428 """ Acquire NAT44 addresses from interface """
2429 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2431 # no address in NAT pool
2432 adresses = self.vapi.nat44_address_dump()
2433 self.assertEqual(0, len(adresses))
2435 # configure interface address and check NAT address pool
2436 self.pg7.config_ip4()
2437 adresses = self.vapi.nat44_address_dump()
2438 self.assertEqual(1, len(adresses))
2439 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2441 # remove interface address and check NAT address pool
2442 self.pg7.unconfig_ip4()
2443 adresses = self.vapi.nat44_address_dump()
2444 self.assertEqual(0, len(adresses))
2446 def test_interface_addr_static_mapping(self):
2447 """ Static mapping with addresses from interface """
2450 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2451 self.nat44_add_static_mapping(
2453 external_sw_if_index=self.pg7.sw_if_index,
2456 # static mappings with external interface
2457 static_mappings = self.vapi.nat44_static_mapping_dump()
2458 self.assertEqual(1, len(static_mappings))
2459 self.assertEqual(self.pg7.sw_if_index,
2460 static_mappings[0].external_sw_if_index)
2461 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2463 # configure interface address and check static mappings
2464 self.pg7.config_ip4()
2465 static_mappings = self.vapi.nat44_static_mapping_dump()
2466 self.assertEqual(1, len(static_mappings))
2467 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2468 self.pg7.local_ip4n)
2469 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2470 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2472 # remove interface address and check static mappings
2473 self.pg7.unconfig_ip4()
2474 static_mappings = self.vapi.nat44_static_mapping_dump()
2475 self.assertEqual(0, len(static_mappings))
2477 def test_interface_addr_identity_nat(self):
2478 """ Identity NAT with addresses from interface """
2481 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2482 self.vapi.nat44_add_del_identity_mapping(
2483 sw_if_index=self.pg7.sw_if_index,
2485 protocol=IP_PROTOS.tcp,
2488 # identity mappings with external interface
2489 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2490 self.assertEqual(1, len(identity_mappings))
2491 self.assertEqual(self.pg7.sw_if_index,
2492 identity_mappings[0].sw_if_index)
2494 # configure interface address and check identity mappings
2495 self.pg7.config_ip4()
2496 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2497 self.assertEqual(1, len(identity_mappings))
2498 self.assertEqual(identity_mappings[0].ip_address,
2499 self.pg7.local_ip4n)
2500 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2501 self.assertEqual(port, identity_mappings[0].port)
2502 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2504 # remove interface address and check identity mappings
2505 self.pg7.unconfig_ip4()
2506 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2507 self.assertEqual(0, len(identity_mappings))
2509 def test_ipfix_nat44_sess(self):
2510 """ IPFIX logging NAT44 session created/delted """
2511 self.ipfix_domain_id = 10
2512 self.ipfix_src_port = 20202
2513 colector_port = 30303
2514 bind_layers(UDP, IPFIX, dport=30303)
2515 self.nat44_add_address(self.nat_addr)
2516 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2517 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2519 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2520 src_address=self.pg3.local_ip4n,
2522 template_interval=10,
2523 collector_port=colector_port)
2524 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2525 src_port=self.ipfix_src_port)
2527 pkts = self.create_stream_in(self.pg0, self.pg1)
2528 self.pg0.add_stream(pkts)
2529 self.pg_enable_capture(self.pg_interfaces)
2531 capture = self.pg1.get_capture(len(pkts))
2532 self.verify_capture_out(capture)
2533 self.nat44_add_address(self.nat_addr, is_add=0)
2534 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2535 capture = self.pg3.get_capture(9)
2536 ipfix = IPFIXDecoder()
2537 # first load template
2539 self.assertTrue(p.haslayer(IPFIX))
2540 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543 self.assertEqual(p[UDP].dport, colector_port)
2544 self.assertEqual(p[IPFIX].observationDomainID,
2545 self.ipfix_domain_id)
2546 if p.haslayer(Template):
2547 ipfix.add_template(p.getlayer(Template))
2548 # verify events in data set
2550 if p.haslayer(Data):
2551 data = ipfix.decode_data_set(p.getlayer(Set))
2552 self.verify_ipfix_nat44_ses(data)
2554 def test_ipfix_addr_exhausted(self):
2555 """ IPFIX logging NAT addresses exhausted """
2556 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2557 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2559 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2560 src_address=self.pg3.local_ip4n,
2562 template_interval=10)
2563 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2564 src_port=self.ipfix_src_port)
2566 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2567 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2569 self.pg0.add_stream(p)
2570 self.pg_enable_capture(self.pg_interfaces)
2572 capture = self.pg1.get_capture(0)
2573 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2574 capture = self.pg3.get_capture(9)
2575 ipfix = IPFIXDecoder()
2576 # first load template
2578 self.assertTrue(p.haslayer(IPFIX))
2579 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2580 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2581 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2582 self.assertEqual(p[UDP].dport, 4739)
2583 self.assertEqual(p[IPFIX].observationDomainID,
2584 self.ipfix_domain_id)
2585 if p.haslayer(Template):
2586 ipfix.add_template(p.getlayer(Template))
2587 # verify events in data set
2589 if p.haslayer(Data):
2590 data = ipfix.decode_data_set(p.getlayer(Set))
2591 self.verify_ipfix_addr_exhausted(data)
2593 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2594 def test_ipfix_max_sessions(self):
2595 """ IPFIX logging maximum session entries exceeded """
2596 self.nat44_add_address(self.nat_addr)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2601 nat44_config = self.vapi.nat_show_config()
2602 max_sessions = 10 * nat44_config.translation_buckets
2605 for i in range(0, max_sessions):
2606 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2607 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2608 IP(src=src, dst=self.pg1.remote_ip4) /
2611 self.pg0.add_stream(pkts)
2612 self.pg_enable_capture(self.pg_interfaces)
2615 self.pg1.get_capture(max_sessions)
2616 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2617 src_address=self.pg3.local_ip4n,
2619 template_interval=10)
2620 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2621 src_port=self.ipfix_src_port)
2623 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2624 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2626 self.pg0.add_stream(p)
2627 self.pg_enable_capture(self.pg_interfaces)
2629 self.pg1.get_capture(0)
2630 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2631 capture = self.pg3.get_capture(9)
2632 ipfix = IPFIXDecoder()
2633 # first load template
2635 self.assertTrue(p.haslayer(IPFIX))
2636 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2637 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2638 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2639 self.assertEqual(p[UDP].dport, 4739)
2640 self.assertEqual(p[IPFIX].observationDomainID,
2641 self.ipfix_domain_id)
2642 if p.haslayer(Template):
2643 ipfix.add_template(p.getlayer(Template))
2644 # verify events in data set
2646 if p.haslayer(Data):
2647 data = ipfix.decode_data_set(p.getlayer(Set))
2648 self.verify_ipfix_max_sessions(data, max_sessions)
2650 def test_pool_addr_fib(self):
2651 """ NAT44 add pool addresses to FIB """
2652 static_addr = '10.0.0.10'
2653 self.nat44_add_address(self.nat_addr)
2654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2657 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2660 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2661 ARP(op=ARP.who_has, pdst=self.nat_addr,
2662 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2663 self.pg1.add_stream(p)
2664 self.pg_enable_capture(self.pg_interfaces)
2666 capture = self.pg1.get_capture(1)
2667 self.assertTrue(capture[0].haslayer(ARP))
2668 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2671 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2672 ARP(op=ARP.who_has, pdst=static_addr,
2673 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2674 self.pg1.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 capture = self.pg1.get_capture(1)
2678 self.assertTrue(capture[0].haslayer(ARP))
2679 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2681 # send ARP to non-NAT44 interface
2682 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2683 ARP(op=ARP.who_has, pdst=self.nat_addr,
2684 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2685 self.pg2.add_stream(p)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg1.get_capture(0)
2690 # remove addresses and verify
2691 self.nat44_add_address(self.nat_addr, is_add=0)
2692 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2695 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2696 ARP(op=ARP.who_has, pdst=self.nat_addr,
2697 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2698 self.pg1.add_stream(p)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg1.get_capture(0)
2703 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2704 ARP(op=ARP.who_has, pdst=static_addr,
2705 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2706 self.pg1.add_stream(p)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg1.get_capture(0)
2711 def test_vrf_mode(self):
2712 """ NAT44 tenant VRF aware address pool mode """
2716 nat_ip1 = "10.0.0.10"
2717 nat_ip2 = "10.0.0.11"
2719 self.pg0.unconfig_ip4()
2720 self.pg1.unconfig_ip4()
2721 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2722 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2723 self.pg0.set_table_ip4(vrf_id1)
2724 self.pg1.set_table_ip4(vrf_id2)
2725 self.pg0.config_ip4()
2726 self.pg1.config_ip4()
2728 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2729 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2730 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2731 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2732 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2736 pkts = self.create_stream_in(self.pg0, self.pg2)
2737 self.pg0.add_stream(pkts)
2738 self.pg_enable_capture(self.pg_interfaces)
2740 capture = self.pg2.get_capture(len(pkts))
2741 self.verify_capture_out(capture, nat_ip1)
2744 pkts = self.create_stream_in(self.pg1, self.pg2)
2745 self.pg1.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg2.get_capture(len(pkts))
2749 self.verify_capture_out(capture, nat_ip2)
2751 self.pg0.unconfig_ip4()
2752 self.pg1.unconfig_ip4()
2753 self.pg0.set_table_ip4(0)
2754 self.pg1.set_table_ip4(0)
2755 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2756 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2758 def test_vrf_feature_independent(self):
2759 """ NAT44 tenant VRF independent address pool mode """
2761 nat_ip1 = "10.0.0.10"
2762 nat_ip2 = "10.0.0.11"
2764 self.nat44_add_address(nat_ip1)
2765 self.nat44_add_address(nat_ip2, vrf_id=99)
2766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2768 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2772 pkts = self.create_stream_in(self.pg0, self.pg2)
2773 self.pg0.add_stream(pkts)
2774 self.pg_enable_capture(self.pg_interfaces)
2776 capture = self.pg2.get_capture(len(pkts))
2777 self.verify_capture_out(capture, nat_ip1)
2780 pkts = self.create_stream_in(self.pg1, self.pg2)
2781 self.pg1.add_stream(pkts)
2782 self.pg_enable_capture(self.pg_interfaces)
2784 capture = self.pg2.get_capture(len(pkts))
2785 self.verify_capture_out(capture, nat_ip1)
2787 def test_dynamic_ipless_interfaces(self):
2788 """ NAT44 interfaces without configured IP address """
2790 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2791 mactobinary(self.pg7.remote_mac),
2792 self.pg7.remote_ip4n,
2794 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2795 mactobinary(self.pg8.remote_mac),
2796 self.pg8.remote_ip4n,
2799 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2800 dst_address_length=32,
2801 next_hop_address=self.pg7.remote_ip4n,
2802 next_hop_sw_if_index=self.pg7.sw_if_index)
2803 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2804 dst_address_length=32,
2805 next_hop_address=self.pg8.remote_ip4n,
2806 next_hop_sw_if_index=self.pg8.sw_if_index)
2808 self.nat44_add_address(self.nat_addr)
2809 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2810 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2814 pkts = self.create_stream_in(self.pg7, self.pg8)
2815 self.pg7.add_stream(pkts)
2816 self.pg_enable_capture(self.pg_interfaces)
2818 capture = self.pg8.get_capture(len(pkts))
2819 self.verify_capture_out(capture)
2822 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2823 self.pg8.add_stream(pkts)
2824 self.pg_enable_capture(self.pg_interfaces)
2826 capture = self.pg7.get_capture(len(pkts))
2827 self.verify_capture_in(capture, self.pg7)
2829 def test_static_ipless_interfaces(self):
2830 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2832 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2833 mactobinary(self.pg7.remote_mac),
2834 self.pg7.remote_ip4n,
2836 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2837 mactobinary(self.pg8.remote_mac),
2838 self.pg8.remote_ip4n,
2841 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2842 dst_address_length=32,
2843 next_hop_address=self.pg7.remote_ip4n,
2844 next_hop_sw_if_index=self.pg7.sw_if_index)
2845 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2846 dst_address_length=32,
2847 next_hop_address=self.pg8.remote_ip4n,
2848 next_hop_sw_if_index=self.pg8.sw_if_index)
2850 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2851 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2852 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2856 pkts = self.create_stream_out(self.pg8)
2857 self.pg8.add_stream(pkts)
2858 self.pg_enable_capture(self.pg_interfaces)
2860 capture = self.pg7.get_capture(len(pkts))
2861 self.verify_capture_in(capture, self.pg7)
2864 pkts = self.create_stream_in(self.pg7, self.pg8)
2865 self.pg7.add_stream(pkts)
2866 self.pg_enable_capture(self.pg_interfaces)
2868 capture = self.pg8.get_capture(len(pkts))
2869 self.verify_capture_out(capture, self.nat_addr, True)
2871 def test_static_with_port_ipless_interfaces(self):
2872 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2874 self.tcp_port_out = 30606
2875 self.udp_port_out = 30607
2876 self.icmp_id_out = 30608
2878 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2879 mactobinary(self.pg7.remote_mac),
2880 self.pg7.remote_ip4n,
2882 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2883 mactobinary(self.pg8.remote_mac),
2884 self.pg8.remote_ip4n,
2887 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2888 dst_address_length=32,
2889 next_hop_address=self.pg7.remote_ip4n,
2890 next_hop_sw_if_index=self.pg7.sw_if_index)
2891 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2892 dst_address_length=32,
2893 next_hop_address=self.pg8.remote_ip4n,
2894 next_hop_sw_if_index=self.pg8.sw_if_index)
2896 self.nat44_add_address(self.nat_addr)
2897 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2898 self.tcp_port_in, self.tcp_port_out,
2899 proto=IP_PROTOS.tcp)
2900 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2901 self.udp_port_in, self.udp_port_out,
2902 proto=IP_PROTOS.udp)
2903 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2904 self.icmp_id_in, self.icmp_id_out,
2905 proto=IP_PROTOS.icmp)
2906 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2907 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2911 pkts = self.create_stream_out(self.pg8)
2912 self.pg8.add_stream(pkts)
2913 self.pg_enable_capture(self.pg_interfaces)
2915 capture = self.pg7.get_capture(len(pkts))
2916 self.verify_capture_in(capture, self.pg7)
2919 pkts = self.create_stream_in(self.pg7, self.pg8)
2920 self.pg7.add_stream(pkts)
2921 self.pg_enable_capture(self.pg_interfaces)
2923 capture = self.pg8.get_capture(len(pkts))
2924 self.verify_capture_out(capture)
2926 def test_static_unknown_proto(self):
2927 """ 1:1 NAT translate packet with unknown protocol """
2928 nat_ip = "10.0.0.10"
2929 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2930 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2931 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2936 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2938 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2939 TCP(sport=1234, dport=1234))
2940 self.pg0.add_stream(p)
2941 self.pg_enable_capture(self.pg_interfaces)
2943 p = self.pg1.get_capture(1)
2946 self.assertEqual(packet[IP].src, nat_ip)
2947 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2948 self.assertTrue(packet.haslayer(GRE))
2949 self.check_ip_checksum(packet)
2951 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2955 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2956 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2958 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2959 TCP(sport=1234, dport=1234))
2960 self.pg1.add_stream(p)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 p = self.pg0.get_capture(1)
2966 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2967 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2968 self.assertTrue(packet.haslayer(GRE))
2969 self.check_ip_checksum(packet)
2971 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2974 def test_hairpinning_static_unknown_proto(self):
2975 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2977 host = self.pg0.remote_hosts[0]
2978 server = self.pg0.remote_hosts[1]
2980 host_nat_ip = "10.0.0.10"
2981 server_nat_ip = "10.0.0.11"
2983 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2984 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2985 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2986 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2990 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2991 IP(src=host.ip4, dst=server_nat_ip) /
2993 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2994 TCP(sport=1234, dport=1234))
2995 self.pg0.add_stream(p)
2996 self.pg_enable_capture(self.pg_interfaces)
2998 p = self.pg0.get_capture(1)
3001 self.assertEqual(packet[IP].src, host_nat_ip)
3002 self.assertEqual(packet[IP].dst, server.ip4)
3003 self.assertTrue(packet.haslayer(GRE))
3004 self.check_ip_checksum(packet)
3006 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3010 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3011 IP(src=server.ip4, dst=host_nat_ip) /
3013 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3014 TCP(sport=1234, dport=1234))
3015 self.pg0.add_stream(p)
3016 self.pg_enable_capture(self.pg_interfaces)
3018 p = self.pg0.get_capture(1)
3021 self.assertEqual(packet[IP].src, server_nat_ip)
3022 self.assertEqual(packet[IP].dst, host.ip4)
3023 self.assertTrue(packet.haslayer(GRE))
3024 self.check_ip_checksum(packet)
3026 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3029 def test_unknown_proto(self):
3030 """ NAT44 translate packet with unknown protocol """
3031 self.nat44_add_address(self.nat_addr)
3032 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3033 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3037 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3038 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3039 TCP(sport=self.tcp_port_in, dport=20))
3040 self.pg0.add_stream(p)
3041 self.pg_enable_capture(self.pg_interfaces)
3043 p = self.pg1.get_capture(1)
3045 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3046 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3048 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3049 TCP(sport=1234, dport=1234))
3050 self.pg0.add_stream(p)
3051 self.pg_enable_capture(self.pg_interfaces)
3053 p = self.pg1.get_capture(1)
3056 self.assertEqual(packet[IP].src, self.nat_addr)
3057 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3058 self.assertTrue(packet.haslayer(GRE))
3059 self.check_ip_checksum(packet)
3061 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3065 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3066 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3068 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3069 TCP(sport=1234, dport=1234))
3070 self.pg1.add_stream(p)
3071 self.pg_enable_capture(self.pg_interfaces)
3073 p = self.pg0.get_capture(1)
3076 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3077 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3078 self.assertTrue(packet.haslayer(GRE))
3079 self.check_ip_checksum(packet)
3081 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3084 def test_hairpinning_unknown_proto(self):
3085 """ NAT44 translate packet with unknown protocol - hairpinning """
3086 host = self.pg0.remote_hosts[0]
3087 server = self.pg0.remote_hosts[1]
3090 server_in_port = 5678
3091 server_out_port = 8765
3092 server_nat_ip = "10.0.0.11"
3094 self.nat44_add_address(self.nat_addr)
3095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3099 # add static mapping for server
3100 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3103 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3104 IP(src=host.ip4, dst=server_nat_ip) /
3105 TCP(sport=host_in_port, dport=server_out_port))
3106 self.pg0.add_stream(p)
3107 self.pg_enable_capture(self.pg_interfaces)
3109 capture = self.pg0.get_capture(1)
3111 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3112 IP(src=host.ip4, dst=server_nat_ip) /
3114 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3115 TCP(sport=1234, dport=1234))
3116 self.pg0.add_stream(p)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 p = self.pg0.get_capture(1)
3122 self.assertEqual(packet[IP].src, self.nat_addr)
3123 self.assertEqual(packet[IP].dst, server.ip4)
3124 self.assertTrue(packet.haslayer(GRE))
3125 self.check_ip_checksum(packet)
3127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3131 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3132 IP(src=server.ip4, dst=self.nat_addr) /
3134 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3135 TCP(sport=1234, dport=1234))
3136 self.pg0.add_stream(p)
3137 self.pg_enable_capture(self.pg_interfaces)
3139 p = self.pg0.get_capture(1)
3142 self.assertEqual(packet[IP].src, server_nat_ip)
3143 self.assertEqual(packet[IP].dst, host.ip4)
3144 self.assertTrue(packet.haslayer(GRE))
3145 self.check_ip_checksum(packet)
3147 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3150 def test_output_feature(self):
3151 """ NAT44 interface output feature (in2out postrouting) """
3152 self.nat44_add_address(self.nat_addr)
3153 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3154 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3155 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3159 pkts = self.create_stream_in(self.pg0, self.pg3)
3160 self.pg0.add_stream(pkts)
3161 self.pg_enable_capture(self.pg_interfaces)
3163 capture = self.pg3.get_capture(len(pkts))
3164 self.verify_capture_out(capture)
3167 pkts = self.create_stream_out(self.pg3)
3168 self.pg3.add_stream(pkts)
3169 self.pg_enable_capture(self.pg_interfaces)
3171 capture = self.pg0.get_capture(len(pkts))
3172 self.verify_capture_in(capture, self.pg0)
3174 # from non-NAT interface to NAT inside interface
3175 pkts = self.create_stream_in(self.pg2, self.pg0)
3176 self.pg2.add_stream(pkts)
3177 self.pg_enable_capture(self.pg_interfaces)
3179 capture = self.pg0.get_capture(len(pkts))
3180 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3182 def test_output_feature_vrf_aware(self):
3183 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3184 nat_ip_vrf10 = "10.0.0.10"
3185 nat_ip_vrf20 = "10.0.0.20"
3187 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3188 dst_address_length=32,
3189 next_hop_address=self.pg3.remote_ip4n,
3190 next_hop_sw_if_index=self.pg3.sw_if_index,
3192 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3193 dst_address_length=32,
3194 next_hop_address=self.pg3.remote_ip4n,
3195 next_hop_sw_if_index=self.pg3.sw_if_index,
3198 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3199 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3200 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3201 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3202 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3206 pkts = self.create_stream_in(self.pg4, self.pg3)
3207 self.pg4.add_stream(pkts)
3208 self.pg_enable_capture(self.pg_interfaces)
3210 capture = self.pg3.get_capture(len(pkts))
3211 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3214 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3215 self.pg3.add_stream(pkts)
3216 self.pg_enable_capture(self.pg_interfaces)
3218 capture = self.pg4.get_capture(len(pkts))
3219 self.verify_capture_in(capture, self.pg4)
3222 pkts = self.create_stream_in(self.pg6, self.pg3)
3223 self.pg6.add_stream(pkts)
3224 self.pg_enable_capture(self.pg_interfaces)
3226 capture = self.pg3.get_capture(len(pkts))
3227 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3230 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3231 self.pg3.add_stream(pkts)
3232 self.pg_enable_capture(self.pg_interfaces)
3234 capture = self.pg6.get_capture(len(pkts))
3235 self.verify_capture_in(capture, self.pg6)
3237 def test_output_feature_hairpinning(self):
3238 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3239 host = self.pg0.remote_hosts[0]
3240 server = self.pg0.remote_hosts[1]
3243 server_in_port = 5678
3244 server_out_port = 8765
3246 self.nat44_add_address(self.nat_addr)
3247 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3248 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3251 # add static mapping for server
3252 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3253 server_in_port, server_out_port,
3254 proto=IP_PROTOS.tcp)
3256 # send packet from host to server
3257 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3258 IP(src=host.ip4, dst=self.nat_addr) /
3259 TCP(sport=host_in_port, dport=server_out_port))
3260 self.pg0.add_stream(p)
3261 self.pg_enable_capture(self.pg_interfaces)
3263 capture = self.pg0.get_capture(1)
3268 self.assertEqual(ip.src, self.nat_addr)
3269 self.assertEqual(ip.dst, server.ip4)
3270 self.assertNotEqual(tcp.sport, host_in_port)
3271 self.assertEqual(tcp.dport, server_in_port)
3272 self.check_tcp_checksum(p)
3273 host_out_port = tcp.sport
3275 self.logger.error(ppp("Unexpected or invalid packet:", p))
3278 # send reply from server to host
3279 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3280 IP(src=server.ip4, dst=self.nat_addr) /
3281 TCP(sport=server_in_port, dport=host_out_port))
3282 self.pg0.add_stream(p)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg0.get_capture(1)
3290 self.assertEqual(ip.src, self.nat_addr)
3291 self.assertEqual(ip.dst, host.ip4)
3292 self.assertEqual(tcp.sport, server_out_port)
3293 self.assertEqual(tcp.dport, host_in_port)
3294 self.check_tcp_checksum(p)
3296 self.logger.error(ppp("Unexpected or invalid packet:", p))
3299 def test_output_feature_and_service(self):
3300 """ NAT44 interface output feature and services """
3301 external_addr = '1.2.3.4'
3305 self.vapi.nat44_forwarding_enable_disable(1)
3306 self.nat44_add_address(self.nat_addr)
3307 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3308 local_port, external_port,
3309 proto=IP_PROTOS.tcp, out2in_only=1)
3310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3314 # from client to service
3315 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3316 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3317 TCP(sport=12345, dport=external_port))
3318 self.pg1.add_stream(p)
3319 self.pg_enable_capture(self.pg_interfaces)
3321 capture = self.pg0.get_capture(1)
3327 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3328 self.assertEqual(tcp.dport, local_port)
3329 self.check_tcp_checksum(p)
3330 self.check_ip_checksum(p)
3332 self.logger.error(ppp("Unexpected or invalid packet:", p))
3335 # from service back to client
3336 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3337 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3338 TCP(sport=local_port, dport=12345))
3339 self.pg0.add_stream(p)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg1.get_capture(1)
3347 self.assertEqual(ip.src, external_addr)
3348 self.assertEqual(tcp.sport, external_port)
3349 self.check_tcp_checksum(p)
3350 self.check_ip_checksum(p)
3352 self.logger.error(ppp("Unexpected or invalid packet:", p))
3355 # from local network host to external network
3356 pkts = self.create_stream_in(self.pg0, self.pg1)
3357 self.pg0.add_stream(pkts)
3358 self.pg_enable_capture(self.pg_interfaces)
3360 capture = self.pg1.get_capture(len(pkts))
3361 self.verify_capture_out(capture)
3362 pkts = self.create_stream_in(self.pg0, self.pg1)
3363 self.pg0.add_stream(pkts)
3364 self.pg_enable_capture(self.pg_interfaces)
3366 capture = self.pg1.get_capture(len(pkts))
3367 self.verify_capture_out(capture)
3369 # from external network back to local network host
3370 pkts = self.create_stream_out(self.pg1)
3371 self.pg1.add_stream(pkts)
3372 self.pg_enable_capture(self.pg_interfaces)
3374 capture = self.pg0.get_capture(len(pkts))
3375 self.verify_capture_in(capture, self.pg0)
3377 def test_output_feature_and_service2(self):
3378 """ NAT44 interface output feature and service host direct access """
3379 self.vapi.nat44_forwarding_enable_disable(1)
3380 self.nat44_add_address(self.nat_addr)
3381 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3384 # session initiaded from service host - translate
3385 pkts = self.create_stream_in(self.pg0, self.pg1)
3386 self.pg0.add_stream(pkts)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 capture = self.pg1.get_capture(len(pkts))
3390 self.verify_capture_out(capture)
3392 pkts = self.create_stream_out(self.pg1)
3393 self.pg1.add_stream(pkts)
3394 self.pg_enable_capture(self.pg_interfaces)
3396 capture = self.pg0.get_capture(len(pkts))
3397 self.verify_capture_in(capture, self.pg0)
3399 tcp_port_out = self.tcp_port_out
3400 udp_port_out = self.udp_port_out
3401 icmp_id_out = self.icmp_id_out
3403 # session initiaded from remote host - do not translate
3404 pkts = self.create_stream_out(self.pg1,
3405 self.pg0.remote_ip4,
3406 use_inside_ports=True)
3407 self.pg1.add_stream(pkts)
3408 self.pg_enable_capture(self.pg_interfaces)
3410 capture = self.pg0.get_capture(len(pkts))
3411 self.verify_capture_in(capture, self.pg0)
3413 pkts = self.create_stream_in(self.pg0, self.pg1)
3414 self.pg0.add_stream(pkts)
3415 self.pg_enable_capture(self.pg_interfaces)
3417 capture = self.pg1.get_capture(len(pkts))
3418 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3421 def test_one_armed_nat44(self):
3422 """ One armed NAT44 """
3423 remote_host = self.pg9.remote_hosts[0]
3424 local_host = self.pg9.remote_hosts[1]
3427 self.nat44_add_address(self.nat_addr)
3428 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3429 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3433 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3434 IP(src=local_host.ip4, dst=remote_host.ip4) /
3435 TCP(sport=12345, dport=80))
3436 self.pg9.add_stream(p)
3437 self.pg_enable_capture(self.pg_interfaces)
3439 capture = self.pg9.get_capture(1)
3444 self.assertEqual(ip.src, self.nat_addr)
3445 self.assertEqual(ip.dst, remote_host.ip4)
3446 self.assertNotEqual(tcp.sport, 12345)
3447 external_port = tcp.sport
3448 self.assertEqual(tcp.dport, 80)
3449 self.check_tcp_checksum(p)
3450 self.check_ip_checksum(p)
3452 self.logger.error(ppp("Unexpected or invalid packet:", p))
3456 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3457 IP(src=remote_host.ip4, dst=self.nat_addr) /
3458 TCP(sport=80, dport=external_port))
3459 self.pg9.add_stream(p)
3460 self.pg_enable_capture(self.pg_interfaces)
3462 capture = self.pg9.get_capture(1)
3467 self.assertEqual(ip.src, remote_host.ip4)
3468 self.assertEqual(ip.dst, local_host.ip4)
3469 self.assertEqual(tcp.sport, 80)
3470 self.assertEqual(tcp.dport, 12345)
3471 self.check_tcp_checksum(p)
3472 self.check_ip_checksum(p)
3474 self.logger.error(ppp("Unexpected or invalid packet:", p))
3477 def test_one_armed_nat44_static(self):
3478 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3479 remote_host = self.pg9.remote_hosts[0]
3480 local_host = self.pg9.remote_hosts[1]
3485 self.vapi.nat44_forwarding_enable_disable(1)
3486 self.nat44_add_address(self.nat_addr, twice_nat=1)
3487 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3488 local_port, external_port,
3489 proto=IP_PROTOS.tcp, out2in_only=1,
3491 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3492 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3495 # from client to service
3496 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3497 IP(src=remote_host.ip4, dst=self.nat_addr) /
3498 TCP(sport=12345, dport=external_port))
3499 self.pg9.add_stream(p)
3500 self.pg_enable_capture(self.pg_interfaces)
3502 capture = self.pg9.get_capture(1)
3508 self.assertEqual(ip.dst, local_host.ip4)
3509 self.assertEqual(ip.src, self.nat_addr)
3510 self.assertEqual(tcp.dport, local_port)
3511 self.assertNotEqual(tcp.sport, 12345)
3512 eh_port_in = tcp.sport
3513 self.check_tcp_checksum(p)
3514 self.check_ip_checksum(p)
3516 self.logger.error(ppp("Unexpected or invalid packet:", p))
3519 # from service back to client
3520 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3521 IP(src=local_host.ip4, dst=self.nat_addr) /
3522 TCP(sport=local_port, dport=eh_port_in))
3523 self.pg9.add_stream(p)
3524 self.pg_enable_capture(self.pg_interfaces)
3526 capture = self.pg9.get_capture(1)
3531 self.assertEqual(ip.src, self.nat_addr)
3532 self.assertEqual(ip.dst, remote_host.ip4)
3533 self.assertEqual(tcp.sport, external_port)
3534 self.assertEqual(tcp.dport, 12345)
3535 self.check_tcp_checksum(p)
3536 self.check_ip_checksum(p)
3538 self.logger.error(ppp("Unexpected or invalid packet:", p))
3541 def test_del_session(self):
3542 """ Delete NAT44 session """
3543 self.nat44_add_address(self.nat_addr)
3544 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3545 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3548 pkts = self.create_stream_in(self.pg0, self.pg1)
3549 self.pg0.add_stream(pkts)
3550 self.pg_enable_capture(self.pg_interfaces)
3552 capture = self.pg1.get_capture(len(pkts))
3554 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3555 nsessions = len(sessions)
3557 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3558 sessions[0].inside_port,
3559 sessions[0].protocol)
3560 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3561 sessions[1].outside_port,
3562 sessions[1].protocol,
3565 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3566 self.assertEqual(nsessions - len(sessions), 2)
3568 def test_set_get_reass(self):
3569 """ NAT44 set/get virtual fragmentation reassembly """
3570 reas_cfg1 = self.vapi.nat_get_reass()
3572 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3573 max_reass=reas_cfg1.ip4_max_reass * 2,
3574 max_frag=reas_cfg1.ip4_max_frag * 2)
3576 reas_cfg2 = self.vapi.nat_get_reass()
3578 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3579 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3580 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3582 self.vapi.nat_set_reass(drop_frag=1)
3583 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3585 def test_frag_in_order(self):
3586 """ NAT44 translate fragments arriving in order """
3587 self.nat44_add_address(self.nat_addr)
3588 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3589 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3592 data = "A" * 4 + "B" * 16 + "C" * 3
3593 self.tcp_port_in = random.randint(1025, 65535)
3595 reass = self.vapi.nat_reass_dump()
3596 reass_n_start = len(reass)
3599 pkts = self.create_stream_frag(self.pg0,
3600 self.pg1.remote_ip4,
3604 self.pg0.add_stream(pkts)
3605 self.pg_enable_capture(self.pg_interfaces)
3607 frags = self.pg1.get_capture(len(pkts))
3608 p = self.reass_frags_and_verify(frags,
3610 self.pg1.remote_ip4)
3611 self.assertEqual(p[TCP].dport, 20)
3612 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3613 self.tcp_port_out = p[TCP].sport
3614 self.assertEqual(data, p[Raw].load)
3617 pkts = self.create_stream_frag(self.pg1,
3622 self.pg1.add_stream(pkts)
3623 self.pg_enable_capture(self.pg_interfaces)
3625 frags = self.pg0.get_capture(len(pkts))
3626 p = self.reass_frags_and_verify(frags,
3627 self.pg1.remote_ip4,
3628 self.pg0.remote_ip4)
3629 self.assertEqual(p[TCP].sport, 20)
3630 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3631 self.assertEqual(data, p[Raw].load)
3633 reass = self.vapi.nat_reass_dump()
3634 reass_n_end = len(reass)
3636 self.assertEqual(reass_n_end - reass_n_start, 2)
3638 def test_reass_hairpinning(self):
3639 """ NAT44 fragments hairpinning """
3640 host = self.pg0.remote_hosts[0]
3641 server = self.pg0.remote_hosts[1]
3642 host_in_port = random.randint(1025, 65535)
3644 server_in_port = random.randint(1025, 65535)
3645 server_out_port = random.randint(1025, 65535)
3646 data = "A" * 4 + "B" * 16 + "C" * 3
3648 self.nat44_add_address(self.nat_addr)
3649 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3650 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3652 # add static mapping for server
3653 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3654 server_in_port, server_out_port,
3655 proto=IP_PROTOS.tcp)
3657 # send packet from host to server
3658 pkts = self.create_stream_frag(self.pg0,
3663 self.pg0.add_stream(pkts)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 frags = self.pg0.get_capture(len(pkts))
3667 p = self.reass_frags_and_verify(frags,
3670 self.assertNotEqual(p[TCP].sport, host_in_port)
3671 self.assertEqual(p[TCP].dport, server_in_port)
3672 self.assertEqual(data, p[Raw].load)
3674 def test_frag_out_of_order(self):
3675 """ NAT44 translate fragments arriving out of order """
3676 self.nat44_add_address(self.nat_addr)
3677 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3678 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3681 data = "A" * 4 + "B" * 16 + "C" * 3
3682 random.randint(1025, 65535)
3685 pkts = self.create_stream_frag(self.pg0,
3686 self.pg1.remote_ip4,
3691 self.pg0.add_stream(pkts)
3692 self.pg_enable_capture(self.pg_interfaces)
3694 frags = self.pg1.get_capture(len(pkts))
3695 p = self.reass_frags_and_verify(frags,
3697 self.pg1.remote_ip4)
3698 self.assertEqual(p[TCP].dport, 20)
3699 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3700 self.tcp_port_out = p[TCP].sport
3701 self.assertEqual(data, p[Raw].load)
3704 pkts = self.create_stream_frag(self.pg1,
3710 self.pg1.add_stream(pkts)
3711 self.pg_enable_capture(self.pg_interfaces)
3713 frags = self.pg0.get_capture(len(pkts))
3714 p = self.reass_frags_and_verify(frags,
3715 self.pg1.remote_ip4,
3716 self.pg0.remote_ip4)
3717 self.assertEqual(p[TCP].sport, 20)
3718 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3719 self.assertEqual(data, p[Raw].load)
3721 def test_port_restricted(self):
3722 """ Port restricted NAT44 (MAP-E CE) """
3723 self.nat44_add_address(self.nat_addr)
3724 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3725 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3727 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3728 "psid-offset 6 psid-len 6")
3730 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3731 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3732 TCP(sport=4567, dport=22))
3733 self.pg0.add_stream(p)
3734 self.pg_enable_capture(self.pg_interfaces)
3736 capture = self.pg1.get_capture(1)
3741 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3742 self.assertEqual(ip.src, self.nat_addr)
3743 self.assertEqual(tcp.dport, 22)
3744 self.assertNotEqual(tcp.sport, 4567)
3745 self.assertEqual((tcp.sport >> 6) & 63, 10)
3746 self.check_tcp_checksum(p)
3747 self.check_ip_checksum(p)
3749 self.logger.error(ppp("Unexpected or invalid packet:", p))
3752 def test_twice_nat(self):
3754 twice_nat_addr = '10.0.1.3'
3759 self.nat44_add_address(self.nat_addr)
3760 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3761 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3762 port_in, port_out, proto=IP_PROTOS.tcp,
3764 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3765 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3768 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3769 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3770 TCP(sport=eh_port_out, dport=port_out))
3771 self.pg1.add_stream(p)
3772 self.pg_enable_capture(self.pg_interfaces)
3774 capture = self.pg0.get_capture(1)
3779 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3780 self.assertEqual(ip.src, twice_nat_addr)
3781 self.assertEqual(tcp.dport, port_in)
3782 self.assertNotEqual(tcp.sport, eh_port_out)
3783 eh_port_in = tcp.sport
3784 self.check_tcp_checksum(p)
3785 self.check_ip_checksum(p)
3787 self.logger.error(ppp("Unexpected or invalid packet:", p))
3790 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3791 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3792 TCP(sport=port_in, dport=eh_port_in))
3793 self.pg0.add_stream(p)
3794 self.pg_enable_capture(self.pg_interfaces)
3796 capture = self.pg1.get_capture(1)
3801 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3802 self.assertEqual(ip.src, self.nat_addr)
3803 self.assertEqual(tcp.dport, eh_port_out)
3804 self.assertEqual(tcp.sport, port_out)
3805 self.check_tcp_checksum(p)
3806 self.check_ip_checksum(p)
3808 self.logger.error(ppp("Unexpected or invalid packet:", p))
3811 def test_twice_nat_lb(self):
3812 """ Twice NAT44 local service load balancing """
3813 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3814 twice_nat_addr = '10.0.1.3'
3819 server1 = self.pg0.remote_hosts[0]
3820 server2 = self.pg0.remote_hosts[1]
3822 locals = [{'addr': server1.ip4n,
3825 {'addr': server2.ip4n,
3829 self.nat44_add_address(self.nat_addr)
3830 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3832 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3836 local_num=len(locals),
3838 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3839 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3842 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3843 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3844 TCP(sport=eh_port_out, dport=external_port))
3845 self.pg1.add_stream(p)
3846 self.pg_enable_capture(self.pg_interfaces)
3848 capture = self.pg0.get_capture(1)
3854 self.assertEqual(ip.src, twice_nat_addr)
3855 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3856 if ip.dst == server1.ip4:
3860 self.assertNotEqual(tcp.sport, eh_port_out)
3861 eh_port_in = tcp.sport
3862 self.assertEqual(tcp.dport, local_port)
3863 self.check_tcp_checksum(p)
3864 self.check_ip_checksum(p)
3866 self.logger.error(ppp("Unexpected or invalid packet:", p))
3869 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3870 IP(src=server.ip4, dst=twice_nat_addr) /
3871 TCP(sport=local_port, dport=eh_port_in))
3872 self.pg0.add_stream(p)
3873 self.pg_enable_capture(self.pg_interfaces)
3875 capture = self.pg1.get_capture(1)
3880 self.assertEqual(ip.src, self.nat_addr)
3881 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3882 self.assertEqual(tcp.sport, external_port)
3883 self.assertEqual(tcp.dport, eh_port_out)
3884 self.check_tcp_checksum(p)
3885 self.check_ip_checksum(p)
3887 self.logger.error(ppp("Unexpected or invalid packet:", p))
3890 def test_twice_nat_interface_addr(self):
3891 """ Acquire twice NAT44 addresses from interface """
3892 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3894 # no address in NAT pool
3895 adresses = self.vapi.nat44_address_dump()
3896 self.assertEqual(0, len(adresses))
3898 # configure interface address and check NAT address pool
3899 self.pg7.config_ip4()
3900 adresses = self.vapi.nat44_address_dump()
3901 self.assertEqual(1, len(adresses))
3902 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3903 self.assertEqual(adresses[0].twice_nat, 1)
3905 # remove interface address and check NAT address pool
3906 self.pg7.unconfig_ip4()
3907 adresses = self.vapi.nat44_address_dump()
3908 self.assertEqual(0, len(adresses))
3910 def test_ipfix_max_frags(self):
3911 """ IPFIX logging maximum fragments pending reassembly exceeded """
3912 self.nat44_add_address(self.nat_addr)
3913 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3914 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3916 self.vapi.nat_set_reass(max_frag=0)
3917 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3918 src_address=self.pg3.local_ip4n,
3920 template_interval=10)
3921 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3922 src_port=self.ipfix_src_port)
3924 data = "A" * 4 + "B" * 16 + "C" * 3
3925 self.tcp_port_in = random.randint(1025, 65535)
3926 pkts = self.create_stream_frag(self.pg0,
3927 self.pg1.remote_ip4,
3931 self.pg0.add_stream(pkts[-1])
3932 self.pg_enable_capture(self.pg_interfaces)
3934 frags = self.pg1.get_capture(0)
3935 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3936 capture = self.pg3.get_capture(9)
3937 ipfix = IPFIXDecoder()
3938 # first load template
3940 self.assertTrue(p.haslayer(IPFIX))
3941 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3942 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3943 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3944 self.assertEqual(p[UDP].dport, 4739)
3945 self.assertEqual(p[IPFIX].observationDomainID,
3946 self.ipfix_domain_id)
3947 if p.haslayer(Template):
3948 ipfix.add_template(p.getlayer(Template))
3949 # verify events in data set
3951 if p.haslayer(Data):
3952 data = ipfix.decode_data_set(p.getlayer(Set))
3953 self.verify_ipfix_max_fragments_ip4(data, 0,
3954 self.pg0.remote_ip4n)
3957 super(TestNAT44, self).tearDown()
3958 if not self.vpp_dead:
3959 self.logger.info(self.vapi.cli("show nat44 addresses"))
3960 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3961 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3962 self.logger.info(self.vapi.cli("show nat44 interface address"))
3963 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3964 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3965 self.vapi.cli("nat addr-port-assignment-alg default")
3969 class TestNAT44Out2InDPO(MethodHolder):
3970 """ NAT44 Test Cases using out2in DPO """
3973 def setUpConstants(cls):
3974 super(TestNAT44Out2InDPO, cls).setUpConstants()
3975 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3978 def setUpClass(cls):
3979 super(TestNAT44Out2InDPO, cls).setUpClass()
3982 cls.tcp_port_in = 6303
3983 cls.tcp_port_out = 6303
3984 cls.udp_port_in = 6304
3985 cls.udp_port_out = 6304
3986 cls.icmp_id_in = 6305
3987 cls.icmp_id_out = 6305
3988 cls.nat_addr = '10.0.0.3'
3989 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3990 cls.dst_ip4 = '192.168.70.1'
3992 cls.create_pg_interfaces(range(2))
3995 cls.pg0.config_ip4()
3996 cls.pg0.resolve_arp()
3999 cls.pg1.config_ip6()
4000 cls.pg1.resolve_ndp()
4002 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4003 dst_address_length=0,
4004 next_hop_address=cls.pg1.remote_ip6n,
4005 next_hop_sw_if_index=cls.pg1.sw_if_index)
4008 super(TestNAT44Out2InDPO, cls).tearDownClass()
4011 def configure_xlat(self):
4012 self.dst_ip6_pfx = '1:2:3::'
4013 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4015 self.dst_ip6_pfx_len = 96
4016 self.src_ip6_pfx = '4:5:6::'
4017 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4019 self.src_ip6_pfx_len = 96
4020 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4021 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4022 '\x00\x00\x00\x00', 0, is_translation=1,
4025 def test_464xlat_ce(self):
4026 """ Test 464XLAT CE with NAT44 """
4028 self.configure_xlat()
4030 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4031 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4033 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4034 self.dst_ip6_pfx_len)
4035 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4036 self.src_ip6_pfx_len)
4039 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4040 self.pg0.add_stream(pkts)
4041 self.pg_enable_capture(self.pg_interfaces)
4043 capture = self.pg1.get_capture(len(pkts))
4044 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4047 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4049 self.pg1.add_stream(pkts)
4050 self.pg_enable_capture(self.pg_interfaces)
4052 capture = self.pg0.get_capture(len(pkts))
4053 self.verify_capture_in(capture, self.pg0)
4055 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4057 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4058 self.nat_addr_n, is_add=0)
4060 def test_464xlat_ce_no_nat(self):
4061 """ Test 464XLAT CE without NAT44 """
4063 self.configure_xlat()
4065 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4066 self.dst_ip6_pfx_len)
4067 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4068 self.src_ip6_pfx_len)
4070 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4071 self.pg0.add_stream(pkts)
4072 self.pg_enable_capture(self.pg_interfaces)
4074 capture = self.pg1.get_capture(len(pkts))
4075 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4076 nat_ip=out_dst_ip6, same_port=True)
4078 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4079 self.pg1.add_stream(pkts)
4080 self.pg_enable_capture(self.pg_interfaces)
4082 capture = self.pg0.get_capture(len(pkts))
4083 self.verify_capture_in(capture, self.pg0)
4086 class TestDeterministicNAT(MethodHolder):
4087 """ Deterministic NAT Test Cases """
4090 def setUpConstants(cls):
4091 super(TestDeterministicNAT, cls).setUpConstants()
4092 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4095 def setUpClass(cls):
4096 super(TestDeterministicNAT, cls).setUpClass()
4099 cls.tcp_port_in = 6303
4100 cls.tcp_external_port = 6303
4101 cls.udp_port_in = 6304
4102 cls.udp_external_port = 6304
4103 cls.icmp_id_in = 6305
4104 cls.nat_addr = '10.0.0.3'
4106 cls.create_pg_interfaces(range(3))
4107 cls.interfaces = list(cls.pg_interfaces)
4109 for i in cls.interfaces:
4114 cls.pg0.generate_remote_hosts(2)
4115 cls.pg0.configure_ipv4_neighbors()
4118 super(TestDeterministicNAT, cls).tearDownClass()
4121 def create_stream_in(self, in_if, out_if, ttl=64):
4123 Create packet stream for inside network
4125 :param in_if: Inside interface
4126 :param out_if: Outside interface
4127 :param ttl: TTL of generated packets
4131 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4132 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4133 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4137 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4138 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4139 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4143 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4144 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4145 ICMP(id=self.icmp_id_in, type='echo-request'))
4150 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4152 Create packet stream for outside network
4154 :param out_if: Outside interface
4155 :param dst_ip: Destination IP address (Default use global NAT address)
4156 :param ttl: TTL of generated packets
4159 dst_ip = self.nat_addr
4162 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4163 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4164 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4168 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4169 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4170 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4174 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4175 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4176 ICMP(id=self.icmp_external_id, type='echo-reply'))
4181 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4183 Verify captured packets on outside network
4185 :param capture: Captured packets
4186 :param nat_ip: Translated IP address (Default use global NAT address)
4187 :param same_port: Sorce port number is not translated (Default False)
4188 :param packet_num: Expected number of packets (Default 3)
4191 nat_ip = self.nat_addr
4192 self.assertEqual(packet_num, len(capture))
4193 for packet in capture:
4195 self.assertEqual(packet[IP].src, nat_ip)
4196 if packet.haslayer(TCP):
4197 self.tcp_port_out = packet[TCP].sport
4198 elif packet.haslayer(UDP):
4199 self.udp_port_out = packet[UDP].sport
4201 self.icmp_external_id = packet[ICMP].id
4203 self.logger.error(ppp("Unexpected or invalid packet "
4204 "(outside network):", packet))
4207 def initiate_tcp_session(self, in_if, out_if):
4209 Initiates TCP session
4211 :param in_if: Inside interface
4212 :param out_if: Outside interface
4215 # SYN packet in->out
4216 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4217 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4218 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4221 self.pg_enable_capture(self.pg_interfaces)
4223 capture = out_if.get_capture(1)
4225 self.tcp_port_out = p[TCP].sport
4227 # SYN + ACK packet out->in
4228 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4229 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4230 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4232 out_if.add_stream(p)
4233 self.pg_enable_capture(self.pg_interfaces)
4235 in_if.get_capture(1)
4237 # ACK packet in->out
4238 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4239 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4240 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4243 self.pg_enable_capture(self.pg_interfaces)
4245 out_if.get_capture(1)
4248 self.logger.error("TCP 3 way handshake failed")
4251 def verify_ipfix_max_entries_per_user(self, data):
4253 Verify IPFIX maximum entries per user exceeded event
4255 :param data: Decoded IPFIX data records
4257 self.assertEqual(1, len(data))
4260 self.assertEqual(ord(record[230]), 13)
4261 # natQuotaExceededEvent
4262 self.assertEqual('\x03\x00\x00\x00', record[466])
4264 self.assertEqual('\xe8\x03\x00\x00', record[473])
4266 self.assertEqual(self.pg0.remote_ip4n, record[8])
4268 def test_deterministic_mode(self):
4269 """ NAT plugin run deterministic mode """
4270 in_addr = '172.16.255.0'
4271 out_addr = '172.17.255.50'
4272 in_addr_t = '172.16.255.20'
4273 in_addr_n = socket.inet_aton(in_addr)
4274 out_addr_n = socket.inet_aton(out_addr)
4275 in_addr_t_n = socket.inet_aton(in_addr_t)
4279 nat_config = self.vapi.nat_show_config()
4280 self.assertEqual(1, nat_config.deterministic)
4282 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4284 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4285 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4286 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4287 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4289 deterministic_mappings = self.vapi.nat_det_map_dump()
4290 self.assertEqual(len(deterministic_mappings), 1)
4291 dsm = deterministic_mappings[0]
4292 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4293 self.assertEqual(in_plen, dsm.in_plen)
4294 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4295 self.assertEqual(out_plen, dsm.out_plen)
4297 self.clear_nat_det()
4298 deterministic_mappings = self.vapi.nat_det_map_dump()
4299 self.assertEqual(len(deterministic_mappings), 0)
4301 def test_set_timeouts(self):
4302 """ Set deterministic NAT timeouts """
4303 timeouts_before = self.vapi.nat_det_get_timeouts()
4305 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4306 timeouts_before.tcp_established + 10,
4307 timeouts_before.tcp_transitory + 10,
4308 timeouts_before.icmp + 10)
4310 timeouts_after = self.vapi.nat_det_get_timeouts()
4312 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4313 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4314 self.assertNotEqual(timeouts_before.tcp_established,
4315 timeouts_after.tcp_established)
4316 self.assertNotEqual(timeouts_before.tcp_transitory,
4317 timeouts_after.tcp_transitory)
4319 def test_det_in(self):
4320 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4322 nat_ip = "10.0.0.10"
4324 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4326 socket.inet_aton(nat_ip),
4328 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4329 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4333 pkts = self.create_stream_in(self.pg0, self.pg1)
4334 self.pg0.add_stream(pkts)
4335 self.pg_enable_capture(self.pg_interfaces)
4337 capture = self.pg1.get_capture(len(pkts))
4338 self.verify_capture_out(capture, nat_ip)
4341 pkts = self.create_stream_out(self.pg1, nat_ip)
4342 self.pg1.add_stream(pkts)
4343 self.pg_enable_capture(self.pg_interfaces)
4345 capture = self.pg0.get_capture(len(pkts))
4346 self.verify_capture_in(capture, self.pg0)
4349 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4350 self.assertEqual(len(sessions), 3)
4354 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4355 self.assertEqual(s.in_port, self.tcp_port_in)
4356 self.assertEqual(s.out_port, self.tcp_port_out)
4357 self.assertEqual(s.ext_port, self.tcp_external_port)
4361 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4362 self.assertEqual(s.in_port, self.udp_port_in)
4363 self.assertEqual(s.out_port, self.udp_port_out)
4364 self.assertEqual(s.ext_port, self.udp_external_port)
4368 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4369 self.assertEqual(s.in_port, self.icmp_id_in)
4370 self.assertEqual(s.out_port, self.icmp_external_id)
4372 def test_multiple_users(self):
4373 """ Deterministic NAT multiple users """
4375 nat_ip = "10.0.0.10"
4377 external_port = 6303
4379 host0 = self.pg0.remote_hosts[0]
4380 host1 = self.pg0.remote_hosts[1]
4382 self.vapi.nat_det_add_del_map(host0.ip4n,
4384 socket.inet_aton(nat_ip),
4386 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4387 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4391 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4392 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4393 TCP(sport=port_in, dport=external_port))
4394 self.pg0.add_stream(p)
4395 self.pg_enable_capture(self.pg_interfaces)
4397 capture = self.pg1.get_capture(1)
4402 self.assertEqual(ip.src, nat_ip)
4403 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4404 self.assertEqual(tcp.dport, external_port)
4405 port_out0 = tcp.sport
4407 self.logger.error(ppp("Unexpected or invalid packet:", p))
4411 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4412 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4413 TCP(sport=port_in, dport=external_port))
4414 self.pg0.add_stream(p)
4415 self.pg_enable_capture(self.pg_interfaces)
4417 capture = self.pg1.get_capture(1)
4422 self.assertEqual(ip.src, nat_ip)
4423 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4424 self.assertEqual(tcp.dport, external_port)
4425 port_out1 = tcp.sport
4427 self.logger.error(ppp("Unexpected or invalid packet:", p))
4430 dms = self.vapi.nat_det_map_dump()
4431 self.assertEqual(1, len(dms))
4432 self.assertEqual(2, dms[0].ses_num)
4435 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4436 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4437 TCP(sport=external_port, dport=port_out0))
4438 self.pg1.add_stream(p)
4439 self.pg_enable_capture(self.pg_interfaces)
4441 capture = self.pg0.get_capture(1)
4446 self.assertEqual(ip.src, self.pg1.remote_ip4)
4447 self.assertEqual(ip.dst, host0.ip4)
4448 self.assertEqual(tcp.dport, port_in)
4449 self.assertEqual(tcp.sport, external_port)
4451 self.logger.error(ppp("Unexpected or invalid packet:", p))
4455 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4456 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4457 TCP(sport=external_port, dport=port_out1))
4458 self.pg1.add_stream(p)
4459 self.pg_enable_capture(self.pg_interfaces)
4461 capture = self.pg0.get_capture(1)
4466 self.assertEqual(ip.src, self.pg1.remote_ip4)
4467 self.assertEqual(ip.dst, host1.ip4)
4468 self.assertEqual(tcp.dport, port_in)
4469 self.assertEqual(tcp.sport, external_port)
4471 self.logger.error(ppp("Unexpected or invalid packet", p))
4474 # session close api test
4475 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4477 self.pg1.remote_ip4n,
4479 dms = self.vapi.nat_det_map_dump()
4480 self.assertEqual(dms[0].ses_num, 1)
4482 self.vapi.nat_det_close_session_in(host0.ip4n,
4484 self.pg1.remote_ip4n,
4486 dms = self.vapi.nat_det_map_dump()
4487 self.assertEqual(dms[0].ses_num, 0)
4489 def test_tcp_session_close_detection_in(self):
4490 """ Deterministic NAT TCP session close from inside network """
4491 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4493 socket.inet_aton(self.nat_addr),
4495 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4496 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4499 self.initiate_tcp_session(self.pg0, self.pg1)
4501 # close the session from inside
4503 # FIN packet in -> out
4504 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4505 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4506 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4508 self.pg0.add_stream(p)
4509 self.pg_enable_capture(self.pg_interfaces)
4511 self.pg1.get_capture(1)
4515 # ACK packet out -> in
4516 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4517 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4518 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4522 # FIN packet out -> in
4523 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4524 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4525 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4529 self.pg1.add_stream(pkts)
4530 self.pg_enable_capture(self.pg_interfaces)
4532 self.pg0.get_capture(2)
4534 # ACK packet in -> out
4535 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4536 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4537 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4539 self.pg0.add_stream(p)
4540 self.pg_enable_capture(self.pg_interfaces)
4542 self.pg1.get_capture(1)
4544 # Check if deterministic NAT44 closed the session
4545 dms = self.vapi.nat_det_map_dump()
4546 self.assertEqual(0, dms[0].ses_num)
4548 self.logger.error("TCP session termination failed")
4551 def test_tcp_session_close_detection_out(self):
4552 """ Deterministic NAT TCP session close from outside network """
4553 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4555 socket.inet_aton(self.nat_addr),
4557 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4558 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4561 self.initiate_tcp_session(self.pg0, self.pg1)
4563 # close the session from outside
4565 # FIN packet out -> in
4566 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4567 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4568 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4570 self.pg1.add_stream(p)
4571 self.pg_enable_capture(self.pg_interfaces)
4573 self.pg0.get_capture(1)
4577 # ACK packet in -> out
4578 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4579 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4580 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4584 # ACK packet in -> out
4585 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4586 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4587 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4591 self.pg0.add_stream(pkts)
4592 self.pg_enable_capture(self.pg_interfaces)
4594 self.pg1.get_capture(2)
4596 # ACK packet out -> in
4597 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4598 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4599 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4601 self.pg1.add_stream(p)
4602 self.pg_enable_capture(self.pg_interfaces)
4604 self.pg0.get_capture(1)
4606 # Check if deterministic NAT44 closed the session
4607 dms = self.vapi.nat_det_map_dump()
4608 self.assertEqual(0, dms[0].ses_num)
4610 self.logger.error("TCP session termination failed")
4613 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4614 def test_session_timeout(self):
4615 """ Deterministic NAT session timeouts """
4616 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4618 socket.inet_aton(self.nat_addr),
4620 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4621 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4624 self.initiate_tcp_session(self.pg0, self.pg1)
4625 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4626 pkts = self.create_stream_in(self.pg0, self.pg1)
4627 self.pg0.add_stream(pkts)
4628 self.pg_enable_capture(self.pg_interfaces)
4630 capture = self.pg1.get_capture(len(pkts))
4633 dms = self.vapi.nat_det_map_dump()
4634 self.assertEqual(0, dms[0].ses_num)
4636 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4637 def test_session_limit_per_user(self):
4638 """ Deterministic NAT maximum sessions per user limit """
4639 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4641 socket.inet_aton(self.nat_addr),
4643 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4644 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4646 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4647 src_address=self.pg2.local_ip4n,
4649 template_interval=10)
4650 self.vapi.nat_ipfix()
4653 for port in range(1025, 2025):
4654 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4655 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4656 UDP(sport=port, dport=port))
4659 self.pg0.add_stream(pkts)
4660 self.pg_enable_capture(self.pg_interfaces)
4662 capture = self.pg1.get_capture(len(pkts))
4664 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4665 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4666 UDP(sport=3001, dport=3002))
4667 self.pg0.add_stream(p)
4668 self.pg_enable_capture(self.pg_interfaces)
4670 capture = self.pg1.assert_nothing_captured()
4672 # verify ICMP error packet
4673 capture = self.pg0.get_capture(1)
4675 self.assertTrue(p.haslayer(ICMP))
4677 self.assertEqual(icmp.type, 3)
4678 self.assertEqual(icmp.code, 1)
4679 self.assertTrue(icmp.haslayer(IPerror))
4680 inner_ip = icmp[IPerror]
4681 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4682 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4684 dms = self.vapi.nat_det_map_dump()
4686 self.assertEqual(1000, dms[0].ses_num)
4688 # verify IPFIX logging
4689 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4691 capture = self.pg2.get_capture(2)
4692 ipfix = IPFIXDecoder()
4693 # first load template
4695 self.assertTrue(p.haslayer(IPFIX))
4696 if p.haslayer(Template):
4697 ipfix.add_template(p.getlayer(Template))
4698 # verify events in data set
4700 if p.haslayer(Data):
4701 data = ipfix.decode_data_set(p.getlayer(Set))
4702 self.verify_ipfix_max_entries_per_user(data)
4704 def clear_nat_det(self):
4706 Clear deterministic NAT configuration.
4708 self.vapi.nat_ipfix(enable=0)
4709 self.vapi.nat_det_set_timeouts()
4710 deterministic_mappings = self.vapi.nat_det_map_dump()
4711 for dsm in deterministic_mappings:
4712 self.vapi.nat_det_add_del_map(dsm.in_addr,
4718 interfaces = self.vapi.nat44_interface_dump()
4719 for intf in interfaces:
4720 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4725 super(TestDeterministicNAT, self).tearDown()
4726 if not self.vpp_dead:
4727 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4729 self.vapi.cli("show nat44 deterministic mappings"))
4731 self.vapi.cli("show nat44 deterministic timeouts"))
4733 self.vapi.cli("show nat44 deterministic sessions"))
4734 self.clear_nat_det()
4737 class TestNAT64(MethodHolder):
4738 """ NAT64 Test Cases """
4741 def setUpConstants(cls):
4742 super(TestNAT64, cls).setUpConstants()
4743 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4744 "nat64 st hash buckets 256", "}"])
4747 def setUpClass(cls):
4748 super(TestNAT64, cls).setUpClass()
4751 cls.tcp_port_in = 6303
4752 cls.tcp_port_out = 6303
4753 cls.udp_port_in = 6304
4754 cls.udp_port_out = 6304
4755 cls.icmp_id_in = 6305
4756 cls.icmp_id_out = 6305
4757 cls.nat_addr = '10.0.0.3'
4758 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4760 cls.vrf1_nat_addr = '10.0.10.3'
4761 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4763 cls.ipfix_src_port = 4739
4764 cls.ipfix_domain_id = 1
4766 cls.create_pg_interfaces(range(5))
4767 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4768 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4769 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4771 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4773 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4775 cls.pg0.generate_remote_hosts(2)
4777 for i in cls.ip6_interfaces:
4780 i.configure_ipv6_neighbors()
4782 for i in cls.ip4_interfaces:
4788 cls.pg3.config_ip4()
4789 cls.pg3.resolve_arp()
4790 cls.pg3.config_ip6()
4791 cls.pg3.configure_ipv6_neighbors()
4794 super(TestNAT64, cls).tearDownClass()
4797 def test_pool(self):
4798 """ Add/delete address to NAT64 pool """
4799 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4801 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4803 addresses = self.vapi.nat64_pool_addr_dump()
4804 self.assertEqual(len(addresses), 1)
4805 self.assertEqual(addresses[0].address, nat_addr)
4807 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4809 addresses = self.vapi.nat64_pool_addr_dump()
4810 self.assertEqual(len(addresses), 0)
4812 def test_interface(self):
4813 """ Enable/disable NAT64 feature on the interface """
4814 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4815 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4817 interfaces = self.vapi.nat64_interface_dump()
4818 self.assertEqual(len(interfaces), 2)
4821 for intf in interfaces:
4822 if intf.sw_if_index == self.pg0.sw_if_index:
4823 self.assertEqual(intf.is_inside, 1)
4825 elif intf.sw_if_index == self.pg1.sw_if_index:
4826 self.assertEqual(intf.is_inside, 0)
4828 self.assertTrue(pg0_found)
4829 self.assertTrue(pg1_found)
4831 features = self.vapi.cli("show interface features pg0")
4832 self.assertNotEqual(features.find('nat64-in2out'), -1)
4833 features = self.vapi.cli("show interface features pg1")
4834 self.assertNotEqual(features.find('nat64-out2in'), -1)
4836 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4837 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4839 interfaces = self.vapi.nat64_interface_dump()
4840 self.assertEqual(len(interfaces), 0)
4842 def test_static_bib(self):
4843 """ Add/delete static BIB entry """
4844 in_addr = socket.inet_pton(socket.AF_INET6,
4845 '2001:db8:85a3::8a2e:370:7334')
4846 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4849 proto = IP_PROTOS.tcp
4851 self.vapi.nat64_add_del_static_bib(in_addr,
4856 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4861 self.assertEqual(bibe.i_addr, in_addr)
4862 self.assertEqual(bibe.o_addr, out_addr)
4863 self.assertEqual(bibe.i_port, in_port)
4864 self.assertEqual(bibe.o_port, out_port)
4865 self.assertEqual(static_bib_num, 1)
4867 self.vapi.nat64_add_del_static_bib(in_addr,
4873 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4878 self.assertEqual(static_bib_num, 0)
4880 def test_set_timeouts(self):
4881 """ Set NAT64 timeouts """
4882 # verify default values
4883 timeouts = self.vapi.nat64_get_timeouts()
4884 self.assertEqual(timeouts.udp, 300)
4885 self.assertEqual(timeouts.icmp, 60)
4886 self.assertEqual(timeouts.tcp_trans, 240)
4887 self.assertEqual(timeouts.tcp_est, 7440)
4888 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4890 # set and verify custom values
4891 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4892 tcp_est=7450, tcp_incoming_syn=10)
4893 timeouts = self.vapi.nat64_get_timeouts()
4894 self.assertEqual(timeouts.udp, 200)
4895 self.assertEqual(timeouts.icmp, 30)
4896 self.assertEqual(timeouts.tcp_trans, 250)
4897 self.assertEqual(timeouts.tcp_est, 7450)
4898 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4900 def test_dynamic(self):
4901 """ NAT64 dynamic translation test """
4902 self.tcp_port_in = 6303
4903 self.udp_port_in = 6304
4904 self.icmp_id_in = 6305
4906 ses_num_start = self.nat64_get_ses_num()
4908 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4910 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4911 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4914 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4915 self.pg0.add_stream(pkts)
4916 self.pg_enable_capture(self.pg_interfaces)
4918 capture = self.pg1.get_capture(len(pkts))
4919 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4920 dst_ip=self.pg1.remote_ip4)
4923 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4924 self.pg1.add_stream(pkts)
4925 self.pg_enable_capture(self.pg_interfaces)
4927 capture = self.pg0.get_capture(len(pkts))
4928 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4929 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4932 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4933 self.pg0.add_stream(pkts)
4934 self.pg_enable_capture(self.pg_interfaces)
4936 capture = self.pg1.get_capture(len(pkts))
4937 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4938 dst_ip=self.pg1.remote_ip4)
4941 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4942 self.pg1.add_stream(pkts)
4943 self.pg_enable_capture(self.pg_interfaces)
4945 capture = self.pg0.get_capture(len(pkts))
4946 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4948 ses_num_end = self.nat64_get_ses_num()
4950 self.assertEqual(ses_num_end - ses_num_start, 3)
4952 # tenant with specific VRF
4953 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4954 self.vrf1_nat_addr_n,
4955 vrf_id=self.vrf1_id)
4956 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4958 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4959 self.pg2.add_stream(pkts)
4960 self.pg_enable_capture(self.pg_interfaces)
4962 capture = self.pg1.get_capture(len(pkts))
4963 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4964 dst_ip=self.pg1.remote_ip4)
4966 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4967 self.pg1.add_stream(pkts)
4968 self.pg_enable_capture(self.pg_interfaces)
4970 capture = self.pg2.get_capture(len(pkts))
4971 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4973 def test_static(self):
4974 """ NAT64 static translation test """
4975 self.tcp_port_in = 60303
4976 self.udp_port_in = 60304
4977 self.icmp_id_in = 60305
4978 self.tcp_port_out = 60303
4979 self.udp_port_out = 60304
4980 self.icmp_id_out = 60305
4982 ses_num_start = self.nat64_get_ses_num()
4984 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4986 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4987 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4989 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4994 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4999 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5006 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5007 self.pg0.add_stream(pkts)
5008 self.pg_enable_capture(self.pg_interfaces)
5010 capture = self.pg1.get_capture(len(pkts))
5011 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5012 dst_ip=self.pg1.remote_ip4, same_port=True)
5015 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5016 self.pg1.add_stream(pkts)
5017 self.pg_enable_capture(self.pg_interfaces)
5019 capture = self.pg0.get_capture(len(pkts))
5020 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5021 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5023 ses_num_end = self.nat64_get_ses_num()
5025 self.assertEqual(ses_num_end - ses_num_start, 3)
5027 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5028 def test_session_timeout(self):
5029 """ NAT64 session timeout """
5030 self.icmp_id_in = 1234
5031 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5033 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5034 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5035 self.vapi.nat64_set_timeouts(icmp=5)
5037 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5038 self.pg0.add_stream(pkts)
5039 self.pg_enable_capture(self.pg_interfaces)
5041 capture = self.pg1.get_capture(len(pkts))
5043 ses_num_before_timeout = self.nat64_get_ses_num()
5047 # ICMP session after timeout
5048 ses_num_after_timeout = self.nat64_get_ses_num()
5049 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5051 def test_icmp_error(self):
5052 """ NAT64 ICMP Error message translation """
5053 self.tcp_port_in = 6303
5054 self.udp_port_in = 6304
5055 self.icmp_id_in = 6305
5057 ses_num_start = self.nat64_get_ses_num()
5059 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5061 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5062 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5064 # send some packets to create sessions
5065 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5066 self.pg0.add_stream(pkts)
5067 self.pg_enable_capture(self.pg_interfaces)
5069 capture_ip4 = self.pg1.get_capture(len(pkts))
5070 self.verify_capture_out(capture_ip4,
5071 nat_ip=self.nat_addr,
5072 dst_ip=self.pg1.remote_ip4)
5074 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5075 self.pg1.add_stream(pkts)
5076 self.pg_enable_capture(self.pg_interfaces)
5078 capture_ip6 = self.pg0.get_capture(len(pkts))
5079 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5080 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5081 self.pg0.remote_ip6)
5084 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5085 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5086 ICMPv6DestUnreach(code=1) /
5087 packet[IPv6] for packet in capture_ip6]
5088 self.pg0.add_stream(pkts)
5089 self.pg_enable_capture(self.pg_interfaces)
5091 capture = self.pg1.get_capture(len(pkts))
5092 for packet in capture:
5094 self.assertEqual(packet[IP].src, self.nat_addr)
5095 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5096 self.assertEqual(packet[ICMP].type, 3)
5097 self.assertEqual(packet[ICMP].code, 13)
5098 inner = packet[IPerror]
5099 self.assertEqual(inner.src, self.pg1.remote_ip4)
5100 self.assertEqual(inner.dst, self.nat_addr)
5101 self.check_icmp_checksum(packet)
5102 if inner.haslayer(TCPerror):
5103 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5104 elif inner.haslayer(UDPerror):
5105 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5107 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5109 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5113 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5114 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5115 ICMP(type=3, code=13) /
5116 packet[IP] for packet in capture_ip4]
5117 self.pg1.add_stream(pkts)
5118 self.pg_enable_capture(self.pg_interfaces)
5120 capture = self.pg0.get_capture(len(pkts))
5121 for packet in capture:
5123 self.assertEqual(packet[IPv6].src, ip.src)
5124 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5125 icmp = packet[ICMPv6DestUnreach]
5126 self.assertEqual(icmp.code, 1)
5127 inner = icmp[IPerror6]
5128 self.assertEqual(inner.src, self.pg0.remote_ip6)
5129 self.assertEqual(inner.dst, ip.src)
5130 self.check_icmpv6_checksum(packet)
5131 if inner.haslayer(TCPerror):
5132 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5133 elif inner.haslayer(UDPerror):
5134 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5136 self.assertEqual(inner[ICMPv6EchoRequest].id,
5139 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5142 def test_hairpinning(self):
5143 """ NAT64 hairpinning """
5145 client = self.pg0.remote_hosts[0]
5146 server = self.pg0.remote_hosts[1]
5147 server_tcp_in_port = 22
5148 server_tcp_out_port = 4022
5149 server_udp_in_port = 23
5150 server_udp_out_port = 4023
5151 client_tcp_in_port = 1234
5152 client_udp_in_port = 1235
5153 client_tcp_out_port = 0
5154 client_udp_out_port = 0
5155 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5156 nat_addr_ip6 = ip.src
5158 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5160 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5161 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5163 self.vapi.nat64_add_del_static_bib(server.ip6n,
5166 server_tcp_out_port,
5168 self.vapi.nat64_add_del_static_bib(server.ip6n,
5171 server_udp_out_port,
5176 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5177 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5178 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5180 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5181 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5182 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5184 self.pg0.add_stream(pkts)
5185 self.pg_enable_capture(self.pg_interfaces)
5187 capture = self.pg0.get_capture(len(pkts))
5188 for packet in capture:
5190 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5191 self.assertEqual(packet[IPv6].dst, server.ip6)
5192 if packet.haslayer(TCP):
5193 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5194 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5195 self.check_tcp_checksum(packet)
5196 client_tcp_out_port = packet[TCP].sport
5198 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5199 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5200 self.check_udp_checksum(packet)
5201 client_udp_out_port = packet[UDP].sport
5203 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5208 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5209 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5210 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5212 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5213 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5214 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5216 self.pg0.add_stream(pkts)
5217 self.pg_enable_capture(self.pg_interfaces)
5219 capture = self.pg0.get_capture(len(pkts))
5220 for packet in capture:
5222 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5223 self.assertEqual(packet[IPv6].dst, client.ip6)
5224 if packet.haslayer(TCP):
5225 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5226 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5227 self.check_tcp_checksum(packet)
5229 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5230 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5231 self.check_udp_checksum(packet)
5233 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5238 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5239 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5240 ICMPv6DestUnreach(code=1) /
5241 packet[IPv6] for packet in capture]
5242 self.pg0.add_stream(pkts)
5243 self.pg_enable_capture(self.pg_interfaces)
5245 capture = self.pg0.get_capture(len(pkts))
5246 for packet in capture:
5248 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5249 self.assertEqual(packet[IPv6].dst, server.ip6)
5250 icmp = packet[ICMPv6DestUnreach]
5251 self.assertEqual(icmp.code, 1)
5252 inner = icmp[IPerror6]
5253 self.assertEqual(inner.src, server.ip6)
5254 self.assertEqual(inner.dst, nat_addr_ip6)
5255 self.check_icmpv6_checksum(packet)
5256 if inner.haslayer(TCPerror):
5257 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5258 self.assertEqual(inner[TCPerror].dport,
5259 client_tcp_out_port)
5261 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5262 self.assertEqual(inner[UDPerror].dport,
5263 client_udp_out_port)
5265 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5268 def test_prefix(self):
5269 """ NAT64 Network-Specific Prefix """
5271 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5273 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5274 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5275 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5276 self.vrf1_nat_addr_n,
5277 vrf_id=self.vrf1_id)
5278 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5281 global_pref64 = "2001:db8::"
5282 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5283 global_pref64_len = 32
5284 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5286 prefix = self.vapi.nat64_prefix_dump()
5287 self.assertEqual(len(prefix), 1)
5288 self.assertEqual(prefix[0].prefix, global_pref64_n)
5289 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5290 self.assertEqual(prefix[0].vrf_id, 0)
5292 # Add tenant specific prefix
5293 vrf1_pref64 = "2001:db8:122:300::"
5294 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5295 vrf1_pref64_len = 56
5296 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5298 vrf_id=self.vrf1_id)
5299 prefix = self.vapi.nat64_prefix_dump()
5300 self.assertEqual(len(prefix), 2)
5303 pkts = self.create_stream_in_ip6(self.pg0,
5306 plen=global_pref64_len)
5307 self.pg0.add_stream(pkts)
5308 self.pg_enable_capture(self.pg_interfaces)
5310 capture = self.pg1.get_capture(len(pkts))
5311 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5312 dst_ip=self.pg1.remote_ip4)
5314 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5315 self.pg1.add_stream(pkts)
5316 self.pg_enable_capture(self.pg_interfaces)
5318 capture = self.pg0.get_capture(len(pkts))
5319 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5322 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5324 # Tenant specific prefix
5325 pkts = self.create_stream_in_ip6(self.pg2,
5328 plen=vrf1_pref64_len)
5329 self.pg2.add_stream(pkts)
5330 self.pg_enable_capture(self.pg_interfaces)
5332 capture = self.pg1.get_capture(len(pkts))
5333 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5334 dst_ip=self.pg1.remote_ip4)
5336 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5337 self.pg1.add_stream(pkts)
5338 self.pg_enable_capture(self.pg_interfaces)
5340 capture = self.pg2.get_capture(len(pkts))
5341 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5344 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5346 def test_unknown_proto(self):
5347 """ NAT64 translate packet with unknown protocol """
5349 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5351 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5352 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5353 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5356 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5357 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5358 TCP(sport=self.tcp_port_in, dport=20))
5359 self.pg0.add_stream(p)
5360 self.pg_enable_capture(self.pg_interfaces)
5362 p = self.pg1.get_capture(1)
5364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5365 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5367 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5368 TCP(sport=1234, dport=1234))
5369 self.pg0.add_stream(p)
5370 self.pg_enable_capture(self.pg_interfaces)
5372 p = self.pg1.get_capture(1)
5375 self.assertEqual(packet[IP].src, self.nat_addr)
5376 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5377 self.assertTrue(packet.haslayer(GRE))
5378 self.check_ip_checksum(packet)
5380 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5384 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5385 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5387 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5388 TCP(sport=1234, dport=1234))
5389 self.pg1.add_stream(p)
5390 self.pg_enable_capture(self.pg_interfaces)
5392 p = self.pg0.get_capture(1)
5395 self.assertEqual(packet[IPv6].src, remote_ip6)
5396 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5397 self.assertEqual(packet[IPv6].nh, 47)
5399 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5402 def test_hairpinning_unknown_proto(self):
5403 """ NAT64 translate packet with unknown protocol - hairpinning """
5405 client = self.pg0.remote_hosts[0]
5406 server = self.pg0.remote_hosts[1]
5407 server_tcp_in_port = 22
5408 server_tcp_out_port = 4022
5409 client_tcp_in_port = 1234
5410 client_tcp_out_port = 1235
5411 server_nat_ip = "10.0.0.100"
5412 client_nat_ip = "10.0.0.110"
5413 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5414 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5415 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5416 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5418 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5420 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5421 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5423 self.vapi.nat64_add_del_static_bib(server.ip6n,
5426 server_tcp_out_port,
5429 self.vapi.nat64_add_del_static_bib(server.ip6n,
5435 self.vapi.nat64_add_del_static_bib(client.ip6n,
5438 client_tcp_out_port,
5442 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5443 IPv6(src=client.ip6, dst=server_nat_ip6) /
5444 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5445 self.pg0.add_stream(p)
5446 self.pg_enable_capture(self.pg_interfaces)
5448 p = self.pg0.get_capture(1)
5450 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5451 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5453 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5454 TCP(sport=1234, dport=1234))
5455 self.pg0.add_stream(p)
5456 self.pg_enable_capture(self.pg_interfaces)
5458 p = self.pg0.get_capture(1)
5461 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5462 self.assertEqual(packet[IPv6].dst, server.ip6)
5463 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5465 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5469 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5470 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5472 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5473 TCP(sport=1234, dport=1234))
5474 self.pg0.add_stream(p)
5475 self.pg_enable_capture(self.pg_interfaces)
5477 p = self.pg0.get_capture(1)
5480 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5481 self.assertEqual(packet[IPv6].dst, client.ip6)
5482 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5484 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5487 def test_one_armed_nat64(self):
5488 """ One armed NAT64 """
5490 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5494 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5496 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5497 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5500 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5501 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5502 TCP(sport=12345, dport=80))
5503 self.pg3.add_stream(p)
5504 self.pg_enable_capture(self.pg_interfaces)
5506 capture = self.pg3.get_capture(1)
5511 self.assertEqual(ip.src, self.nat_addr)
5512 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5513 self.assertNotEqual(tcp.sport, 12345)
5514 external_port = tcp.sport
5515 self.assertEqual(tcp.dport, 80)
5516 self.check_tcp_checksum(p)
5517 self.check_ip_checksum(p)
5519 self.logger.error(ppp("Unexpected or invalid packet:", p))
5523 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5524 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5525 TCP(sport=80, dport=external_port))
5526 self.pg3.add_stream(p)
5527 self.pg_enable_capture(self.pg_interfaces)
5529 capture = self.pg3.get_capture(1)
5534 self.assertEqual(ip.src, remote_host_ip6)
5535 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5536 self.assertEqual(tcp.sport, 80)
5537 self.assertEqual(tcp.dport, 12345)
5538 self.check_tcp_checksum(p)
5540 self.logger.error(ppp("Unexpected or invalid packet:", p))
5543 def test_frag_in_order(self):
5544 """ NAT64 translate fragments arriving in order """
5545 self.tcp_port_in = random.randint(1025, 65535)
5547 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5549 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5550 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5552 reass = self.vapi.nat_reass_dump()
5553 reass_n_start = len(reass)
5557 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5558 self.tcp_port_in, 20, data)
5559 self.pg0.add_stream(pkts)
5560 self.pg_enable_capture(self.pg_interfaces)
5562 frags = self.pg1.get_capture(len(pkts))
5563 p = self.reass_frags_and_verify(frags,
5565 self.pg1.remote_ip4)
5566 self.assertEqual(p[TCP].dport, 20)
5567 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5568 self.tcp_port_out = p[TCP].sport
5569 self.assertEqual(data, p[Raw].load)
5572 data = "A" * 4 + "b" * 16 + "C" * 3
5573 pkts = self.create_stream_frag(self.pg1,
5578 self.pg1.add_stream(pkts)
5579 self.pg_enable_capture(self.pg_interfaces)
5581 frags = self.pg0.get_capture(len(pkts))
5582 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5583 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5584 self.assertEqual(p[TCP].sport, 20)
5585 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5586 self.assertEqual(data, p[Raw].load)
5588 reass = self.vapi.nat_reass_dump()
5589 reass_n_end = len(reass)
5591 self.assertEqual(reass_n_end - reass_n_start, 2)
5593 def test_reass_hairpinning(self):
5594 """ NAT64 fragments hairpinning """
5596 client = self.pg0.remote_hosts[0]
5597 server = self.pg0.remote_hosts[1]
5598 server_in_port = random.randint(1025, 65535)
5599 server_out_port = random.randint(1025, 65535)
5600 client_in_port = random.randint(1025, 65535)
5601 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5602 nat_addr_ip6 = ip.src
5604 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5606 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5607 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5609 # add static BIB entry for server
5610 self.vapi.nat64_add_del_static_bib(server.ip6n,
5616 # send packet from host to server
5617 pkts = self.create_stream_frag_ip6(self.pg0,
5622 self.pg0.add_stream(pkts)
5623 self.pg_enable_capture(self.pg_interfaces)
5625 frags = self.pg0.get_capture(len(pkts))
5626 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5627 self.assertNotEqual(p[TCP].sport, client_in_port)
5628 self.assertEqual(p[TCP].dport, server_in_port)
5629 self.assertEqual(data, p[Raw].load)
5631 def test_frag_out_of_order(self):
5632 """ NAT64 translate fragments arriving out of order """
5633 self.tcp_port_in = random.randint(1025, 65535)
5635 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5637 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5638 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5642 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5643 self.tcp_port_in, 20, data)
5645 self.pg0.add_stream(pkts)
5646 self.pg_enable_capture(self.pg_interfaces)
5648 frags = self.pg1.get_capture(len(pkts))
5649 p = self.reass_frags_and_verify(frags,
5651 self.pg1.remote_ip4)
5652 self.assertEqual(p[TCP].dport, 20)
5653 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5654 self.tcp_port_out = p[TCP].sport
5655 self.assertEqual(data, p[Raw].load)
5658 data = "A" * 4 + "B" * 16 + "C" * 3
5659 pkts = self.create_stream_frag(self.pg1,
5665 self.pg1.add_stream(pkts)
5666 self.pg_enable_capture(self.pg_interfaces)
5668 frags = self.pg0.get_capture(len(pkts))
5669 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5670 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5671 self.assertEqual(p[TCP].sport, 20)
5672 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5673 self.assertEqual(data, p[Raw].load)
5675 def test_interface_addr(self):
5676 """ Acquire NAT64 pool addresses from interface """
5677 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5679 # no address in NAT64 pool
5680 adresses = self.vapi.nat44_address_dump()
5681 self.assertEqual(0, len(adresses))
5683 # configure interface address and check NAT64 address pool
5684 self.pg4.config_ip4()
5685 addresses = self.vapi.nat64_pool_addr_dump()
5686 self.assertEqual(len(addresses), 1)
5687 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5689 # remove interface address and check NAT64 address pool
5690 self.pg4.unconfig_ip4()
5691 addresses = self.vapi.nat64_pool_addr_dump()
5692 self.assertEqual(0, len(adresses))
5694 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5695 def test_ipfix_max_bibs_sessions(self):
5696 """ IPFIX logging maximum session and BIB entries exceeded """
5699 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5703 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5705 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5706 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5710 for i in range(0, max_bibs):
5711 src = "fd01:aa::%x" % (i)
5712 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5713 IPv6(src=src, dst=remote_host_ip6) /
5714 TCP(sport=12345, dport=80))
5716 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5717 IPv6(src=src, dst=remote_host_ip6) /
5718 TCP(sport=12345, dport=22))
5720 self.pg0.add_stream(pkts)
5721 self.pg_enable_capture(self.pg_interfaces)
5723 self.pg1.get_capture(max_sessions)
5725 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5726 src_address=self.pg3.local_ip4n,
5728 template_interval=10)
5729 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5730 src_port=self.ipfix_src_port)
5732 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5733 IPv6(src=src, dst=remote_host_ip6) /
5734 TCP(sport=12345, dport=25))
5735 self.pg0.add_stream(p)
5736 self.pg_enable_capture(self.pg_interfaces)
5738 self.pg1.get_capture(0)
5739 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5740 capture = self.pg3.get_capture(9)
5741 ipfix = IPFIXDecoder()
5742 # first load template
5744 self.assertTrue(p.haslayer(IPFIX))
5745 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5746 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5747 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5748 self.assertEqual(p[UDP].dport, 4739)
5749 self.assertEqual(p[IPFIX].observationDomainID,
5750 self.ipfix_domain_id)
5751 if p.haslayer(Template):
5752 ipfix.add_template(p.getlayer(Template))
5753 # verify events in data set
5755 if p.haslayer(Data):
5756 data = ipfix.decode_data_set(p.getlayer(Set))
5757 self.verify_ipfix_max_sessions(data, max_sessions)
5759 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5760 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5761 TCP(sport=12345, dport=80))
5762 self.pg0.add_stream(p)
5763 self.pg_enable_capture(self.pg_interfaces)
5765 self.pg1.get_capture(0)
5766 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5767 capture = self.pg3.get_capture(1)
5768 # verify events in data set
5770 self.assertTrue(p.haslayer(IPFIX))
5771 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5772 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5773 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5774 self.assertEqual(p[UDP].dport, 4739)
5775 self.assertEqual(p[IPFIX].observationDomainID,
5776 self.ipfix_domain_id)
5777 if p.haslayer(Data):
5778 data = ipfix.decode_data_set(p.getlayer(Set))
5779 self.verify_ipfix_max_bibs(data, max_bibs)
5781 def test_ipfix_max_frags(self):
5782 """ IPFIX logging maximum fragments pending reassembly exceeded """
5783 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5785 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5786 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5787 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5788 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5789 src_address=self.pg3.local_ip4n,
5791 template_interval=10)
5792 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5793 src_port=self.ipfix_src_port)
5796 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5797 self.tcp_port_in, 20, data)
5798 self.pg0.add_stream(pkts[-1])
5799 self.pg_enable_capture(self.pg_interfaces)
5801 self.pg1.get_capture(0)
5802 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5803 capture = self.pg3.get_capture(9)
5804 ipfix = IPFIXDecoder()
5805 # first load template
5807 self.assertTrue(p.haslayer(IPFIX))
5808 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5809 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5810 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5811 self.assertEqual(p[UDP].dport, 4739)
5812 self.assertEqual(p[IPFIX].observationDomainID,
5813 self.ipfix_domain_id)
5814 if p.haslayer(Template):
5815 ipfix.add_template(p.getlayer(Template))
5816 # verify events in data set
5818 if p.haslayer(Data):
5819 data = ipfix.decode_data_set(p.getlayer(Set))
5820 self.verify_ipfix_max_fragments_ip6(data, 0,
5821 self.pg0.remote_ip6n)
5823 def test_ipfix_bib_ses(self):
5824 """ IPFIX logging NAT64 BIB/session create and delete events """
5825 self.tcp_port_in = random.randint(1025, 65535)
5826 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5830 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5832 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5833 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5834 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5835 src_address=self.pg3.local_ip4n,
5837 template_interval=10)
5838 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5839 src_port=self.ipfix_src_port)
5842 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5843 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5844 TCP(sport=self.tcp_port_in, dport=25))
5845 self.pg0.add_stream(p)
5846 self.pg_enable_capture(self.pg_interfaces)
5848 p = self.pg1.get_capture(1)
5849 self.tcp_port_out = p[0][TCP].sport
5850 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5851 capture = self.pg3.get_capture(10)
5852 ipfix = IPFIXDecoder()
5853 # first load template
5855 self.assertTrue(p.haslayer(IPFIX))
5856 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5857 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5858 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5859 self.assertEqual(p[UDP].dport, 4739)
5860 self.assertEqual(p[IPFIX].observationDomainID,
5861 self.ipfix_domain_id)
5862 if p.haslayer(Template):
5863 ipfix.add_template(p.getlayer(Template))
5864 # verify events in data set
5866 if p.haslayer(Data):
5867 data = ipfix.decode_data_set(p.getlayer(Set))
5868 if ord(data[0][230]) == 10:
5869 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5870 elif ord(data[0][230]) == 6:
5871 self.verify_ipfix_nat64_ses(data,
5873 self.pg0.remote_ip6n,
5874 self.pg1.remote_ip4,
5877 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5880 self.pg_enable_capture(self.pg_interfaces)
5881 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5884 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5885 capture = self.pg3.get_capture(2)
5886 # verify events in data set
5888 self.assertTrue(p.haslayer(IPFIX))
5889 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5890 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5891 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5892 self.assertEqual(p[UDP].dport, 4739)
5893 self.assertEqual(p[IPFIX].observationDomainID,
5894 self.ipfix_domain_id)
5895 if p.haslayer(Data):
5896 data = ipfix.decode_data_set(p.getlayer(Set))
5897 if ord(data[0][230]) == 11:
5898 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5899 elif ord(data[0][230]) == 7:
5900 self.verify_ipfix_nat64_ses(data,
5902 self.pg0.remote_ip6n,
5903 self.pg1.remote_ip4,
5906 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5908 def nat64_get_ses_num(self):
5910 Return number of active NAT64 sessions.
5912 st = self.vapi.nat64_st_dump()
5915 def clear_nat64(self):
5917 Clear NAT64 configuration.
5919 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5920 domain_id=self.ipfix_domain_id)
5921 self.ipfix_src_port = 4739
5922 self.ipfix_domain_id = 1
5924 self.vapi.nat64_set_timeouts()
5926 interfaces = self.vapi.nat64_interface_dump()
5927 for intf in interfaces:
5928 if intf.is_inside > 1:
5929 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5932 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5936 bib = self.vapi.nat64_bib_dump(255)
5939 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5947 adresses = self.vapi.nat64_pool_addr_dump()
5948 for addr in adresses:
5949 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5954 prefixes = self.vapi.nat64_prefix_dump()
5955 for prefix in prefixes:
5956 self.vapi.nat64_add_del_prefix(prefix.prefix,
5958 vrf_id=prefix.vrf_id,
5962 super(TestNAT64, self).tearDown()
5963 if not self.vpp_dead:
5964 self.logger.info(self.vapi.cli("show nat64 pool"))
5965 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5966 self.logger.info(self.vapi.cli("show nat64 prefix"))
5967 self.logger.info(self.vapi.cli("show nat64 bib all"))
5968 self.logger.info(self.vapi.cli("show nat64 session table all"))
5969 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5973 class TestDSlite(MethodHolder):
5974 """ DS-Lite Test Cases """
5977 def setUpClass(cls):
5978 super(TestDSlite, cls).setUpClass()
5981 cls.nat_addr = '10.0.0.3'
5982 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5984 cls.create_pg_interfaces(range(2))
5986 cls.pg0.config_ip4()
5987 cls.pg0.resolve_arp()
5989 cls.pg1.config_ip6()
5990 cls.pg1.generate_remote_hosts(2)
5991 cls.pg1.configure_ipv6_neighbors()
5994 super(TestDSlite, cls).tearDownClass()
5997 def test_dslite(self):
5998 """ Test DS-Lite """
5999 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6001 aftr_ip4 = '192.0.0.1'
6002 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6003 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6004 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6005 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6008 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6009 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6010 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6011 UDP(sport=20000, dport=10000))
6012 self.pg1.add_stream(p)
6013 self.pg_enable_capture(self.pg_interfaces)
6015 capture = self.pg0.get_capture(1)
6016 capture = capture[0]
6017 self.assertFalse(capture.haslayer(IPv6))
6018 self.assertEqual(capture[IP].src, self.nat_addr)
6019 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6020 self.assertNotEqual(capture[UDP].sport, 20000)
6021 self.assertEqual(capture[UDP].dport, 10000)
6022 self.check_ip_checksum(capture)
6023 out_port = capture[UDP].sport
6025 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6026 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6027 UDP(sport=10000, dport=out_port))
6028 self.pg0.add_stream(p)
6029 self.pg_enable_capture(self.pg_interfaces)
6031 capture = self.pg1.get_capture(1)
6032 capture = capture[0]
6033 self.assertEqual(capture[IPv6].src, aftr_ip6)
6034 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6035 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6036 self.assertEqual(capture[IP].dst, '192.168.1.1')
6037 self.assertEqual(capture[UDP].sport, 10000)
6038 self.assertEqual(capture[UDP].dport, 20000)
6039 self.check_ip_checksum(capture)
6042 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6043 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6044 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6045 TCP(sport=20001, dport=10001))
6046 self.pg1.add_stream(p)
6047 self.pg_enable_capture(self.pg_interfaces)
6049 capture = self.pg0.get_capture(1)
6050 capture = capture[0]
6051 self.assertFalse(capture.haslayer(IPv6))
6052 self.assertEqual(capture[IP].src, self.nat_addr)
6053 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6054 self.assertNotEqual(capture[TCP].sport, 20001)
6055 self.assertEqual(capture[TCP].dport, 10001)
6056 self.check_ip_checksum(capture)
6057 self.check_tcp_checksum(capture)
6058 out_port = capture[TCP].sport
6060 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6061 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6062 TCP(sport=10001, dport=out_port))
6063 self.pg0.add_stream(p)
6064 self.pg_enable_capture(self.pg_interfaces)
6066 capture = self.pg1.get_capture(1)
6067 capture = capture[0]
6068 self.assertEqual(capture[IPv6].src, aftr_ip6)
6069 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6070 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6071 self.assertEqual(capture[IP].dst, '192.168.1.1')
6072 self.assertEqual(capture[TCP].sport, 10001)
6073 self.assertEqual(capture[TCP].dport, 20001)
6074 self.check_ip_checksum(capture)
6075 self.check_tcp_checksum(capture)
6078 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6079 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6080 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6081 ICMP(id=4000, type='echo-request'))
6082 self.pg1.add_stream(p)
6083 self.pg_enable_capture(self.pg_interfaces)
6085 capture = self.pg0.get_capture(1)
6086 capture = capture[0]
6087 self.assertFalse(capture.haslayer(IPv6))
6088 self.assertEqual(capture[IP].src, self.nat_addr)
6089 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6090 self.assertNotEqual(capture[ICMP].id, 4000)
6091 self.check_ip_checksum(capture)
6092 self.check_icmp_checksum(capture)
6093 out_id = capture[ICMP].id
6095 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6096 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6097 ICMP(id=out_id, type='echo-reply'))
6098 self.pg0.add_stream(p)
6099 self.pg_enable_capture(self.pg_interfaces)
6101 capture = self.pg1.get_capture(1)
6102 capture = capture[0]
6103 self.assertEqual(capture[IPv6].src, aftr_ip6)
6104 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6105 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6106 self.assertEqual(capture[IP].dst, '192.168.1.1')
6107 self.assertEqual(capture[ICMP].id, 4000)
6108 self.check_ip_checksum(capture)
6109 self.check_icmp_checksum(capture)
6111 # ping DS-Lite AFTR tunnel endpoint address
6112 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6113 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6114 ICMPv6EchoRequest())
6115 self.pg1.add_stream(p)
6116 self.pg_enable_capture(self.pg_interfaces)
6118 capture = self.pg1.get_capture(1)
6119 self.assertEqual(1, len(capture))
6120 capture = capture[0]
6121 self.assertEqual(capture[IPv6].src, aftr_ip6)
6122 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6123 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6126 super(TestDSlite, self).tearDown()
6127 if not self.vpp_dead:
6128 self.logger.info(self.vapi.cli("show dslite pool"))
6130 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6131 self.logger.info(self.vapi.cli("show dslite sessions"))
6134 class TestDSliteCE(MethodHolder):
6135 """ DS-Lite CE Test Cases """
6138 def setUpConstants(cls):
6139 super(TestDSliteCE, cls).setUpConstants()
6140 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6143 def setUpClass(cls):
6144 super(TestDSliteCE, cls).setUpClass()
6147 cls.create_pg_interfaces(range(2))
6149 cls.pg0.config_ip4()
6150 cls.pg0.resolve_arp()
6152 cls.pg1.config_ip6()
6153 cls.pg1.generate_remote_hosts(1)
6154 cls.pg1.configure_ipv6_neighbors()
6157 super(TestDSliteCE, cls).tearDownClass()
6160 def test_dslite_ce(self):
6161 """ Test DS-Lite CE """
6163 b4_ip4 = '192.0.0.2'
6164 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6165 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6166 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6167 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6169 aftr_ip4 = '192.0.0.1'
6170 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6171 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6172 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6173 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6175 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6176 dst_address_length=128,
6177 next_hop_address=self.pg1.remote_ip6n,
6178 next_hop_sw_if_index=self.pg1.sw_if_index,
6182 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6183 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6184 UDP(sport=10000, dport=20000))
6185 self.pg0.add_stream(p)
6186 self.pg_enable_capture(self.pg_interfaces)
6188 capture = self.pg1.get_capture(1)
6189 capture = capture[0]
6190 self.assertEqual(capture[IPv6].src, b4_ip6)
6191 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6192 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6193 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6194 self.assertEqual(capture[UDP].sport, 10000)
6195 self.assertEqual(capture[UDP].dport, 20000)
6196 self.check_ip_checksum(capture)
6199 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6200 IPv6(dst=b4_ip6, src=aftr_ip6) /
6201 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6202 UDP(sport=20000, dport=10000))
6203 self.pg1.add_stream(p)
6204 self.pg_enable_capture(self.pg_interfaces)
6206 capture = self.pg0.get_capture(1)
6207 capture = capture[0]
6208 self.assertFalse(capture.haslayer(IPv6))
6209 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6210 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6211 self.assertEqual(capture[UDP].sport, 20000)
6212 self.assertEqual(capture[UDP].dport, 10000)
6213 self.check_ip_checksum(capture)
6215 # ping DS-Lite B4 tunnel endpoint address
6216 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6217 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6218 ICMPv6EchoRequest())
6219 self.pg1.add_stream(p)
6220 self.pg_enable_capture(self.pg_interfaces)
6222 capture = self.pg1.get_capture(1)
6223 self.assertEqual(1, len(capture))
6224 capture = capture[0]
6225 self.assertEqual(capture[IPv6].src, b4_ip6)
6226 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6227 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6230 super(TestDSliteCE, self).tearDown()
6231 if not self.vpp_dead:
6233 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6235 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6238 class TestNAT66(MethodHolder):
6239 """ NAT66 Test Cases """
6242 def setUpClass(cls):
6243 super(TestNAT66, cls).setUpClass()
6246 cls.nat_addr = 'fd01:ff::2'
6247 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6249 cls.create_pg_interfaces(range(2))
6250 cls.interfaces = list(cls.pg_interfaces)
6252 for i in cls.interfaces:
6255 i.configure_ipv6_neighbors()
6258 super(TestNAT66, cls).tearDownClass()
6261 def test_static(self):
6262 """ 1:1 NAT66 test """
6263 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6264 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6265 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6270 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6271 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6274 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6275 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6278 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6279 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6280 ICMPv6EchoRequest())
6282 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6283 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6284 GRE() / IP() / TCP())
6286 self.pg0.add_stream(pkts)
6287 self.pg_enable_capture(self.pg_interfaces)
6289 capture = self.pg1.get_capture(len(pkts))
6290 for packet in capture:
6292 self.assertEqual(packet[IPv6].src, self.nat_addr)
6293 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6294 if packet.haslayer(TCP):
6295 self.check_tcp_checksum(packet)
6296 elif packet.haslayer(UDP):
6297 self.check_udp_checksum(packet)
6298 elif packet.haslayer(ICMPv6EchoRequest):
6299 self.check_icmpv6_checksum(packet)
6301 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6306 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6307 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6310 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6311 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6314 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6315 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6318 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6319 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6320 GRE() / IP() / TCP())
6322 self.pg1.add_stream(pkts)
6323 self.pg_enable_capture(self.pg_interfaces)
6325 capture = self.pg0.get_capture(len(pkts))
6326 for packet in capture:
6328 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6329 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6330 if packet.haslayer(TCP):
6331 self.check_tcp_checksum(packet)
6332 elif packet.haslayer(UDP):
6333 self.check_udp_checksum(packet)
6334 elif packet.haslayer(ICMPv6EchoReply):
6335 self.check_icmpv6_checksum(packet)
6337 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6340 sm = self.vapi.nat66_static_mapping_dump()
6341 self.assertEqual(len(sm), 1)
6342 self.assertEqual(sm[0].total_pkts, 8)
6344 def clear_nat66(self):
6346 Clear NAT66 configuration.
6348 interfaces = self.vapi.nat66_interface_dump()
6349 for intf in interfaces:
6350 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6354 static_mappings = self.vapi.nat66_static_mapping_dump()
6355 for sm in static_mappings:
6356 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6357 sm.external_ip_address,
6362 super(TestNAT66, self).tearDown()
6363 if not self.vpp_dead:
6364 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6365 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6368 if __name__ == '__main__':
6369 unittest.main(testRunner=VppTestRunner)