9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1388 pkts = self.create_stream_in(self.pg0, self.pg1)
1389 self.pg0.add_stream(pkts)
1390 self.pg_enable_capture(self.pg_interfaces)
1392 capture = self.pg1.get_capture(len(pkts))
1393 self.verify_capture_out(capture, nat_ip, True)
1396 pkts = self.create_stream_out(self.pg1, nat_ip)
1397 self.pg1.add_stream(pkts)
1398 self.pg_enable_capture(self.pg_interfaces)
1400 capture = self.pg0.get_capture(len(pkts))
1401 self.verify_capture_in(capture, self.pg0)
1403 def test_static_out(self):
1404 """ 1:1 NAT initialized from outside network """
1406 nat_ip = "10.0.0.20"
1407 self.tcp_port_out = 6303
1408 self.udp_port_out = 6304
1409 self.icmp_id_out = 6305
1412 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1416 sm = self.vapi.nat44_static_mapping_dump()
1417 self.assertEqual(len(sm), 1)
1418 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1421 pkts = self.create_stream_out(self.pg1, nat_ip)
1422 self.pg1.add_stream(pkts)
1423 self.pg_enable_capture(self.pg_interfaces)
1425 capture = self.pg0.get_capture(len(pkts))
1426 self.verify_capture_in(capture, self.pg0)
1429 pkts = self.create_stream_in(self.pg0, self.pg1)
1430 self.pg0.add_stream(pkts)
1431 self.pg_enable_capture(self.pg_interfaces)
1433 capture = self.pg1.get_capture(len(pkts))
1434 self.verify_capture_out(capture, nat_ip, True)
1436 def test_static_with_port_in(self):
1437 """ 1:1 NAPT initialized from inside network """
1439 self.tcp_port_out = 3606
1440 self.udp_port_out = 3607
1441 self.icmp_id_out = 3608
1443 self.nat44_add_address(self.nat_addr)
1444 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445 self.tcp_port_in, self.tcp_port_out,
1446 proto=IP_PROTOS.tcp)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.udp_port_in, self.udp_port_out,
1449 proto=IP_PROTOS.udp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.icmp_id_in, self.icmp_id_out,
1452 proto=IP_PROTOS.icmp)
1453 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1454 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1458 pkts = self.create_stream_in(self.pg0, self.pg1)
1459 self.pg0.add_stream(pkts)
1460 self.pg_enable_capture(self.pg_interfaces)
1462 capture = self.pg1.get_capture(len(pkts))
1463 self.verify_capture_out(capture)
1466 pkts = self.create_stream_out(self.pg1)
1467 self.pg1.add_stream(pkts)
1468 self.pg_enable_capture(self.pg_interfaces)
1470 capture = self.pg0.get_capture(len(pkts))
1471 self.verify_capture_in(capture, self.pg0)
1473 def test_static_with_port_out(self):
1474 """ 1:1 NAPT initialized from outside network """
1476 self.tcp_port_out = 30606
1477 self.udp_port_out = 30607
1478 self.icmp_id_out = 30608
1480 self.nat44_add_address(self.nat_addr)
1481 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1482 self.tcp_port_in, self.tcp_port_out,
1483 proto=IP_PROTOS.tcp)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.udp_port_in, self.udp_port_out,
1486 proto=IP_PROTOS.udp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.icmp_id_in, self.icmp_id_out,
1489 proto=IP_PROTOS.icmp)
1490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1495 pkts = self.create_stream_out(self.pg1)
1496 self.pg1.add_stream(pkts)
1497 self.pg_enable_capture(self.pg_interfaces)
1499 capture = self.pg0.get_capture(len(pkts))
1500 self.verify_capture_in(capture, self.pg0)
1503 pkts = self.create_stream_in(self.pg0, self.pg1)
1504 self.pg0.add_stream(pkts)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 capture = self.pg1.get_capture(len(pkts))
1508 self.verify_capture_out(capture)
1510 def test_static_with_port_out2(self):
1511 """ 1:1 NAPT symmetrical rule """
1516 self.vapi.nat44_forwarding_enable_disable(1)
1517 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1518 local_port, external_port,
1519 proto=IP_PROTOS.tcp, out2in_only=1)
1520 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1521 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1524 # from client to service
1525 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1526 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1527 TCP(sport=12345, dport=external_port))
1528 self.pg1.add_stream(p)
1529 self.pg_enable_capture(self.pg_interfaces)
1531 capture = self.pg0.get_capture(1)
1537 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1538 self.assertEqual(tcp.dport, local_port)
1539 self.check_tcp_checksum(p)
1540 self.check_ip_checksum(p)
1542 self.logger.error(ppp("Unexpected or invalid packet:", p))
1546 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1547 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1548 ICMP(type=11) / capture[0][IP])
1549 self.pg0.add_stream(p)
1550 self.pg_enable_capture(self.pg_interfaces)
1552 capture = self.pg1.get_capture(1)
1555 self.assertEqual(p[IP].src, self.nat_addr)
1557 self.assertEqual(inner.dst, self.nat_addr)
1558 self.assertEqual(inner[TCPerror].dport, external_port)
1560 self.logger.error(ppp("Unexpected or invalid packet:", p))
1563 # from service back to client
1564 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1565 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1566 TCP(sport=local_port, dport=12345))
1567 self.pg0.add_stream(p)
1568 self.pg_enable_capture(self.pg_interfaces)
1570 capture = self.pg1.get_capture(1)
1575 self.assertEqual(ip.src, self.nat_addr)
1576 self.assertEqual(tcp.sport, external_port)
1577 self.check_tcp_checksum(p)
1578 self.check_ip_checksum(p)
1580 self.logger.error(ppp("Unexpected or invalid packet:", p))
1584 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1585 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1586 ICMP(type=11) / capture[0][IP])
1587 self.pg1.add_stream(p)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(1)
1593 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1595 self.assertEqual(inner.src, self.pg0.remote_ip4)
1596 self.assertEqual(inner[TCPerror].sport, local_port)
1598 self.logger.error(ppp("Unexpected or invalid packet:", p))
1601 # from client to server (no translation)
1602 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1603 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1604 TCP(sport=12346, dport=local_port))
1605 self.pg1.add_stream(p)
1606 self.pg_enable_capture(self.pg_interfaces)
1608 capture = self.pg0.get_capture(1)
1614 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1615 self.assertEqual(tcp.dport, local_port)
1616 self.check_tcp_checksum(p)
1617 self.check_ip_checksum(p)
1619 self.logger.error(ppp("Unexpected or invalid packet:", p))
1622 # from service back to client (no translation)
1623 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1624 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1625 TCP(sport=local_port, dport=12346))
1626 self.pg0.add_stream(p)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg1.get_capture(1)
1634 self.assertEqual(ip.src, self.pg0.remote_ip4)
1635 self.assertEqual(tcp.sport, local_port)
1636 self.check_tcp_checksum(p)
1637 self.check_ip_checksum(p)
1639 self.logger.error(ppp("Unexpected or invalid packet:", p))
1642 def test_static_vrf_aware(self):
1643 """ 1:1 NAT VRF awareness """
1645 nat_ip1 = "10.0.0.30"
1646 nat_ip2 = "10.0.0.40"
1647 self.tcp_port_out = 6303
1648 self.udp_port_out = 6304
1649 self.icmp_id_out = 6305
1651 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1653 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1655 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1657 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1658 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1660 # inside interface VRF match NAT44 static mapping VRF
1661 pkts = self.create_stream_in(self.pg4, self.pg3)
1662 self.pg4.add_stream(pkts)
1663 self.pg_enable_capture(self.pg_interfaces)
1665 capture = self.pg3.get_capture(len(pkts))
1666 self.verify_capture_out(capture, nat_ip1, True)
1668 # inside interface VRF don't match NAT44 static mapping VRF (packets
1670 pkts = self.create_stream_in(self.pg0, self.pg3)
1671 self.pg0.add_stream(pkts)
1672 self.pg_enable_capture(self.pg_interfaces)
1674 self.pg3.assert_nothing_captured()
1676 def test_identity_nat(self):
1677 """ Identity NAT """
1679 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1680 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1681 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1684 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1686 TCP(sport=12345, dport=56789))
1687 self.pg1.add_stream(p)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg0.get_capture(1)
1695 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1696 self.assertEqual(ip.src, self.pg1.remote_ip4)
1697 self.assertEqual(tcp.dport, 56789)
1698 self.assertEqual(tcp.sport, 12345)
1699 self.check_tcp_checksum(p)
1700 self.check_ip_checksum(p)
1702 self.logger.error(ppp("Unexpected or invalid packet:", p))
1705 def test_static_lb(self):
1706 """ NAT44 local service load balancing """
1707 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1710 server1 = self.pg0.remote_hosts[0]
1711 server2 = self.pg0.remote_hosts[1]
1713 locals = [{'addr': server1.ip4n,
1716 {'addr': server2.ip4n,
1720 self.nat44_add_address(self.nat_addr)
1721 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1724 local_num=len(locals),
1726 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1727 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1730 # from client to service
1731 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1732 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1733 TCP(sport=12345, dport=external_port))
1734 self.pg1.add_stream(p)
1735 self.pg_enable_capture(self.pg_interfaces)
1737 capture = self.pg0.get_capture(1)
1743 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1744 if ip.dst == server1.ip4:
1748 self.assertEqual(tcp.dport, local_port)
1749 self.check_tcp_checksum(p)
1750 self.check_ip_checksum(p)
1752 self.logger.error(ppp("Unexpected or invalid packet:", p))
1755 # from service back to client
1756 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1757 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1758 TCP(sport=local_port, dport=12345))
1759 self.pg0.add_stream(p)
1760 self.pg_enable_capture(self.pg_interfaces)
1762 capture = self.pg1.get_capture(1)
1767 self.assertEqual(ip.src, self.nat_addr)
1768 self.assertEqual(tcp.sport, external_port)
1769 self.check_tcp_checksum(p)
1770 self.check_ip_checksum(p)
1772 self.logger.error(ppp("Unexpected or invalid packet:", p))
1778 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1780 for client in clients:
1781 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1782 IP(src=client, dst=self.nat_addr) /
1783 TCP(sport=12345, dport=external_port))
1785 self.pg1.add_stream(pkts)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg0.get_capture(len(pkts))
1790 if p[IP].dst == server1.ip4:
1794 self.assertTrue(server1_n > server2_n)
1796 def test_static_lb_2(self):
1797 """ NAT44 local service load balancing (asymmetrical rule) """
1798 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1801 server1 = self.pg0.remote_hosts[0]
1802 server2 = self.pg0.remote_hosts[1]
1804 locals = [{'addr': server1.ip4n,
1807 {'addr': server2.ip4n,
1811 self.vapi.nat44_forwarding_enable_disable(1)
1812 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1816 local_num=len(locals),
1818 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1819 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1822 # from client to service
1823 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1824 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1825 TCP(sport=12345, dport=external_port))
1826 self.pg1.add_stream(p)
1827 self.pg_enable_capture(self.pg_interfaces)
1829 capture = self.pg0.get_capture(1)
1835 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1836 if ip.dst == server1.ip4:
1840 self.assertEqual(tcp.dport, local_port)
1841 self.check_tcp_checksum(p)
1842 self.check_ip_checksum(p)
1844 self.logger.error(ppp("Unexpected or invalid packet:", p))
1847 # from service back to client
1848 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1849 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1850 TCP(sport=local_port, dport=12345))
1851 self.pg0.add_stream(p)
1852 self.pg_enable_capture(self.pg_interfaces)
1854 capture = self.pg1.get_capture(1)
1859 self.assertEqual(ip.src, self.nat_addr)
1860 self.assertEqual(tcp.sport, external_port)
1861 self.check_tcp_checksum(p)
1862 self.check_ip_checksum(p)
1864 self.logger.error(ppp("Unexpected or invalid packet:", p))
1867 # from client to server (no translation)
1868 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1869 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1870 TCP(sport=12346, dport=local_port))
1871 self.pg1.add_stream(p)
1872 self.pg_enable_capture(self.pg_interfaces)
1874 capture = self.pg0.get_capture(1)
1880 self.assertEqual(ip.dst, server1.ip4)
1881 self.assertEqual(tcp.dport, local_port)
1882 self.check_tcp_checksum(p)
1883 self.check_ip_checksum(p)
1885 self.logger.error(ppp("Unexpected or invalid packet:", p))
1888 # from service back to client (no translation)
1889 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1890 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1891 TCP(sport=local_port, dport=12346))
1892 self.pg0.add_stream(p)
1893 self.pg_enable_capture(self.pg_interfaces)
1895 capture = self.pg1.get_capture(1)
1900 self.assertEqual(ip.src, server1.ip4)
1901 self.assertEqual(tcp.sport, local_port)
1902 self.check_tcp_checksum(p)
1903 self.check_ip_checksum(p)
1905 self.logger.error(ppp("Unexpected or invalid packet:", p))
1908 def test_multiple_inside_interfaces(self):
1909 """ NAT44 multiple non-overlapping address space inside interfaces """
1911 self.nat44_add_address(self.nat_addr)
1912 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1913 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1914 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1917 # between two NAT44 inside interfaces (no translation)
1918 pkts = self.create_stream_in(self.pg0, self.pg1)
1919 self.pg0.add_stream(pkts)
1920 self.pg_enable_capture(self.pg_interfaces)
1922 capture = self.pg1.get_capture(len(pkts))
1923 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1925 # from NAT44 inside to interface without NAT44 feature (no translation)
1926 pkts = self.create_stream_in(self.pg0, self.pg2)
1927 self.pg0.add_stream(pkts)
1928 self.pg_enable_capture(self.pg_interfaces)
1930 capture = self.pg2.get_capture(len(pkts))
1931 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1933 # in2out 1st interface
1934 pkts = self.create_stream_in(self.pg0, self.pg3)
1935 self.pg0.add_stream(pkts)
1936 self.pg_enable_capture(self.pg_interfaces)
1938 capture = self.pg3.get_capture(len(pkts))
1939 self.verify_capture_out(capture)
1941 # out2in 1st interface
1942 pkts = self.create_stream_out(self.pg3)
1943 self.pg3.add_stream(pkts)
1944 self.pg_enable_capture(self.pg_interfaces)
1946 capture = self.pg0.get_capture(len(pkts))
1947 self.verify_capture_in(capture, self.pg0)
1949 # in2out 2nd interface
1950 pkts = self.create_stream_in(self.pg1, self.pg3)
1951 self.pg1.add_stream(pkts)
1952 self.pg_enable_capture(self.pg_interfaces)
1954 capture = self.pg3.get_capture(len(pkts))
1955 self.verify_capture_out(capture)
1957 # out2in 2nd interface
1958 pkts = self.create_stream_out(self.pg3)
1959 self.pg3.add_stream(pkts)
1960 self.pg_enable_capture(self.pg_interfaces)
1962 capture = self.pg1.get_capture(len(pkts))
1963 self.verify_capture_in(capture, self.pg1)
1965 def test_inside_overlapping_interfaces(self):
1966 """ NAT44 multiple inside interfaces with overlapping address space """
1968 static_nat_ip = "10.0.0.10"
1969 self.nat44_add_address(self.nat_addr)
1970 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1972 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1973 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1974 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1975 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1978 # between NAT44 inside interfaces with same VRF (no translation)
1979 pkts = self.create_stream_in(self.pg4, self.pg5)
1980 self.pg4.add_stream(pkts)
1981 self.pg_enable_capture(self.pg_interfaces)
1983 capture = self.pg5.get_capture(len(pkts))
1984 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1986 # between NAT44 inside interfaces with different VRF (hairpinning)
1987 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1988 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1989 TCP(sport=1234, dport=5678))
1990 self.pg4.add_stream(p)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg6.get_capture(1)
1998 self.assertEqual(ip.src, self.nat_addr)
1999 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2000 self.assertNotEqual(tcp.sport, 1234)
2001 self.assertEqual(tcp.dport, 5678)
2003 self.logger.error(ppp("Unexpected or invalid packet:", p))
2006 # in2out 1st interface
2007 pkts = self.create_stream_in(self.pg4, self.pg3)
2008 self.pg4.add_stream(pkts)
2009 self.pg_enable_capture(self.pg_interfaces)
2011 capture = self.pg3.get_capture(len(pkts))
2012 self.verify_capture_out(capture)
2014 # out2in 1st interface
2015 pkts = self.create_stream_out(self.pg3)
2016 self.pg3.add_stream(pkts)
2017 self.pg_enable_capture(self.pg_interfaces)
2019 capture = self.pg4.get_capture(len(pkts))
2020 self.verify_capture_in(capture, self.pg4)
2022 # in2out 2nd interface
2023 pkts = self.create_stream_in(self.pg5, self.pg3)
2024 self.pg5.add_stream(pkts)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg3.get_capture(len(pkts))
2028 self.verify_capture_out(capture)
2030 # out2in 2nd interface
2031 pkts = self.create_stream_out(self.pg3)
2032 self.pg3.add_stream(pkts)
2033 self.pg_enable_capture(self.pg_interfaces)
2035 capture = self.pg5.get_capture(len(pkts))
2036 self.verify_capture_in(capture, self.pg5)
2039 addresses = self.vapi.nat44_address_dump()
2040 self.assertEqual(len(addresses), 1)
2041 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2042 self.assertEqual(len(sessions), 3)
2043 for session in sessions:
2044 self.assertFalse(session.is_static)
2045 self.assertEqual(session.inside_ip_address[0:4],
2046 self.pg5.remote_ip4n)
2047 self.assertEqual(session.outside_ip_address,
2048 addresses[0].ip_address)
2049 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2050 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2051 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2052 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2053 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2054 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2055 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2056 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2057 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2059 # in2out 3rd interface
2060 pkts = self.create_stream_in(self.pg6, self.pg3)
2061 self.pg6.add_stream(pkts)
2062 self.pg_enable_capture(self.pg_interfaces)
2064 capture = self.pg3.get_capture(len(pkts))
2065 self.verify_capture_out(capture, static_nat_ip, True)
2067 # out2in 3rd interface
2068 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2069 self.pg3.add_stream(pkts)
2070 self.pg_enable_capture(self.pg_interfaces)
2072 capture = self.pg6.get_capture(len(pkts))
2073 self.verify_capture_in(capture, self.pg6)
2075 # general user and session dump verifications
2076 users = self.vapi.nat44_user_dump()
2077 self.assertTrue(len(users) >= 3)
2078 addresses = self.vapi.nat44_address_dump()
2079 self.assertEqual(len(addresses), 1)
2081 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2083 for session in sessions:
2084 self.assertEqual(user.ip_address, session.inside_ip_address)
2085 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2086 self.assertTrue(session.protocol in
2087 [IP_PROTOS.tcp, IP_PROTOS.udp,
2091 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2092 self.assertTrue(len(sessions) >= 4)
2093 for session in sessions:
2094 self.assertFalse(session.is_static)
2095 self.assertEqual(session.inside_ip_address[0:4],
2096 self.pg4.remote_ip4n)
2097 self.assertEqual(session.outside_ip_address,
2098 addresses[0].ip_address)
2101 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2102 self.assertTrue(len(sessions) >= 3)
2103 for session in sessions:
2104 self.assertTrue(session.is_static)
2105 self.assertEqual(session.inside_ip_address[0:4],
2106 self.pg6.remote_ip4n)
2107 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2108 map(int, static_nat_ip.split('.')))
2109 self.assertTrue(session.inside_port in
2110 [self.tcp_port_in, self.udp_port_in,
2113 def test_hairpinning(self):
2114 """ NAT44 hairpinning - 1:1 NAPT """
2116 host = self.pg0.remote_hosts[0]
2117 server = self.pg0.remote_hosts[1]
2120 server_in_port = 5678
2121 server_out_port = 8765
2123 self.nat44_add_address(self.nat_addr)
2124 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2125 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2127 # add static mapping for server
2128 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2129 server_in_port, server_out_port,
2130 proto=IP_PROTOS.tcp)
2132 # send packet from host to server
2133 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2134 IP(src=host.ip4, dst=self.nat_addr) /
2135 TCP(sport=host_in_port, dport=server_out_port))
2136 self.pg0.add_stream(p)
2137 self.pg_enable_capture(self.pg_interfaces)
2139 capture = self.pg0.get_capture(1)
2144 self.assertEqual(ip.src, self.nat_addr)
2145 self.assertEqual(ip.dst, server.ip4)
2146 self.assertNotEqual(tcp.sport, host_in_port)
2147 self.assertEqual(tcp.dport, server_in_port)
2148 self.check_tcp_checksum(p)
2149 host_out_port = tcp.sport
2151 self.logger.error(ppp("Unexpected or invalid packet:", p))
2154 # send reply from server to host
2155 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2156 IP(src=server.ip4, dst=self.nat_addr) /
2157 TCP(sport=server_in_port, dport=host_out_port))
2158 self.pg0.add_stream(p)
2159 self.pg_enable_capture(self.pg_interfaces)
2161 capture = self.pg0.get_capture(1)
2166 self.assertEqual(ip.src, self.nat_addr)
2167 self.assertEqual(ip.dst, host.ip4)
2168 self.assertEqual(tcp.sport, server_out_port)
2169 self.assertEqual(tcp.dport, host_in_port)
2170 self.check_tcp_checksum(p)
2172 self.logger.error(ppp("Unexpected or invalid packet:", p))
2175 def test_hairpinning2(self):
2176 """ NAT44 hairpinning - 1:1 NAT"""
2178 server1_nat_ip = "10.0.0.10"
2179 server2_nat_ip = "10.0.0.11"
2180 host = self.pg0.remote_hosts[0]
2181 server1 = self.pg0.remote_hosts[1]
2182 server2 = self.pg0.remote_hosts[2]
2183 server_tcp_port = 22
2184 server_udp_port = 20
2186 self.nat44_add_address(self.nat_addr)
2187 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2188 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2191 # add static mapping for servers
2192 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2193 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2197 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2198 IP(src=host.ip4, dst=server1_nat_ip) /
2199 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2201 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2202 IP(src=host.ip4, dst=server1_nat_ip) /
2203 UDP(sport=self.udp_port_in, dport=server_udp_port))
2205 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2206 IP(src=host.ip4, dst=server1_nat_ip) /
2207 ICMP(id=self.icmp_id_in, type='echo-request'))
2209 self.pg0.add_stream(pkts)
2210 self.pg_enable_capture(self.pg_interfaces)
2212 capture = self.pg0.get_capture(len(pkts))
2213 for packet in capture:
2215 self.assertEqual(packet[IP].src, self.nat_addr)
2216 self.assertEqual(packet[IP].dst, server1.ip4)
2217 if packet.haslayer(TCP):
2218 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2219 self.assertEqual(packet[TCP].dport, server_tcp_port)
2220 self.tcp_port_out = packet[TCP].sport
2221 self.check_tcp_checksum(packet)
2222 elif packet.haslayer(UDP):
2223 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2224 self.assertEqual(packet[UDP].dport, server_udp_port)
2225 self.udp_port_out = packet[UDP].sport
2227 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2228 self.icmp_id_out = packet[ICMP].id
2230 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236 IP(src=server1.ip4, dst=self.nat_addr) /
2237 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240 IP(src=server1.ip4, dst=self.nat_addr) /
2241 UDP(sport=server_udp_port, dport=self.udp_port_out))
2243 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2244 IP(src=server1.ip4, dst=self.nat_addr) /
2245 ICMP(id=self.icmp_id_out, type='echo-reply'))
2247 self.pg0.add_stream(pkts)
2248 self.pg_enable_capture(self.pg_interfaces)
2250 capture = self.pg0.get_capture(len(pkts))
2251 for packet in capture:
2253 self.assertEqual(packet[IP].src, server1_nat_ip)
2254 self.assertEqual(packet[IP].dst, host.ip4)
2255 if packet.haslayer(TCP):
2256 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2257 self.assertEqual(packet[TCP].sport, server_tcp_port)
2258 self.check_tcp_checksum(packet)
2259 elif packet.haslayer(UDP):
2260 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2261 self.assertEqual(packet[UDP].sport, server_udp_port)
2263 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2265 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2268 # server2 to server1
2270 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2271 IP(src=server2.ip4, dst=server1_nat_ip) /
2272 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2274 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2275 IP(src=server2.ip4, dst=server1_nat_ip) /
2276 UDP(sport=self.udp_port_in, dport=server_udp_port))
2278 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2279 IP(src=server2.ip4, dst=server1_nat_ip) /
2280 ICMP(id=self.icmp_id_in, type='echo-request'))
2282 self.pg0.add_stream(pkts)
2283 self.pg_enable_capture(self.pg_interfaces)
2285 capture = self.pg0.get_capture(len(pkts))
2286 for packet in capture:
2288 self.assertEqual(packet[IP].src, server2_nat_ip)
2289 self.assertEqual(packet[IP].dst, server1.ip4)
2290 if packet.haslayer(TCP):
2291 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2292 self.assertEqual(packet[TCP].dport, server_tcp_port)
2293 self.tcp_port_out = packet[TCP].sport
2294 self.check_tcp_checksum(packet)
2295 elif packet.haslayer(UDP):
2296 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2297 self.assertEqual(packet[UDP].dport, server_udp_port)
2298 self.udp_port_out = packet[UDP].sport
2300 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2301 self.icmp_id_out = packet[ICMP].id
2303 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306 # server1 to server2
2308 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309 IP(src=server1.ip4, dst=server2_nat_ip) /
2310 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2312 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313 IP(src=server1.ip4, dst=server2_nat_ip) /
2314 UDP(sport=server_udp_port, dport=self.udp_port_out))
2316 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2317 IP(src=server1.ip4, dst=server2_nat_ip) /
2318 ICMP(id=self.icmp_id_out, type='echo-reply'))
2320 self.pg0.add_stream(pkts)
2321 self.pg_enable_capture(self.pg_interfaces)
2323 capture = self.pg0.get_capture(len(pkts))
2324 for packet in capture:
2326 self.assertEqual(packet[IP].src, server1_nat_ip)
2327 self.assertEqual(packet[IP].dst, server2.ip4)
2328 if packet.haslayer(TCP):
2329 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2330 self.assertEqual(packet[TCP].sport, server_tcp_port)
2331 self.check_tcp_checksum(packet)
2332 elif packet.haslayer(UDP):
2333 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2334 self.assertEqual(packet[UDP].sport, server_udp_port)
2336 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2338 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2341 def test_max_translations_per_user(self):
2342 """ MAX translations per user - recycle the least recently used """
2344 self.nat44_add_address(self.nat_addr)
2345 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2346 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2349 # get maximum number of translations per user
2350 nat44_config = self.vapi.nat_show_config()
2352 # send more than maximum number of translations per user packets
2353 pkts_num = nat44_config.max_translations_per_user + 5
2355 for port in range(0, pkts_num):
2356 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2357 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2358 TCP(sport=1025 + port))
2360 self.pg0.add_stream(pkts)
2361 self.pg_enable_capture(self.pg_interfaces)
2364 # verify number of translated packet
2365 self.pg1.get_capture(pkts_num)
2367 def test_interface_addr(self):
2368 """ Acquire NAT44 addresses from interface """
2369 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2371 # no address in NAT pool
2372 adresses = self.vapi.nat44_address_dump()
2373 self.assertEqual(0, len(adresses))
2375 # configure interface address and check NAT address pool
2376 self.pg7.config_ip4()
2377 adresses = self.vapi.nat44_address_dump()
2378 self.assertEqual(1, len(adresses))
2379 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2381 # remove interface address and check NAT address pool
2382 self.pg7.unconfig_ip4()
2383 adresses = self.vapi.nat44_address_dump()
2384 self.assertEqual(0, len(adresses))
2386 def test_interface_addr_static_mapping(self):
2387 """ Static mapping with addresses from interface """
2390 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2391 self.nat44_add_static_mapping(
2393 external_sw_if_index=self.pg7.sw_if_index,
2396 # static mappings with external interface
2397 static_mappings = self.vapi.nat44_static_mapping_dump()
2398 self.assertEqual(1, len(static_mappings))
2399 self.assertEqual(self.pg7.sw_if_index,
2400 static_mappings[0].external_sw_if_index)
2401 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2403 # configure interface address and check static mappings
2404 self.pg7.config_ip4()
2405 static_mappings = self.vapi.nat44_static_mapping_dump()
2406 self.assertEqual(1, len(static_mappings))
2407 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2408 self.pg7.local_ip4n)
2409 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2410 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2412 # remove interface address and check static mappings
2413 self.pg7.unconfig_ip4()
2414 static_mappings = self.vapi.nat44_static_mapping_dump()
2415 self.assertEqual(0, len(static_mappings))
2417 def test_interface_addr_identity_nat(self):
2418 """ Identity NAT with addresses from interface """
2421 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2422 self.vapi.nat44_add_del_identity_mapping(
2423 sw_if_index=self.pg7.sw_if_index,
2425 protocol=IP_PROTOS.tcp,
2428 # identity mappings with external interface
2429 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2430 self.assertEqual(1, len(identity_mappings))
2431 self.assertEqual(self.pg7.sw_if_index,
2432 identity_mappings[0].sw_if_index)
2434 # configure interface address and check identity mappings
2435 self.pg7.config_ip4()
2436 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2437 self.assertEqual(1, len(identity_mappings))
2438 self.assertEqual(identity_mappings[0].ip_address,
2439 self.pg7.local_ip4n)
2440 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2441 self.assertEqual(port, identity_mappings[0].port)
2442 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2444 # remove interface address and check identity mappings
2445 self.pg7.unconfig_ip4()
2446 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2447 self.assertEqual(0, len(identity_mappings))
2449 def test_ipfix_nat44_sess(self):
2450 """ IPFIX logging NAT44 session created/delted """
2451 self.ipfix_domain_id = 10
2452 self.ipfix_src_port = 20202
2453 colector_port = 30303
2454 bind_layers(UDP, IPFIX, dport=30303)
2455 self.nat44_add_address(self.nat_addr)
2456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2459 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2460 src_address=self.pg3.local_ip4n,
2462 template_interval=10,
2463 collector_port=colector_port)
2464 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2465 src_port=self.ipfix_src_port)
2467 pkts = self.create_stream_in(self.pg0, self.pg1)
2468 self.pg0.add_stream(pkts)
2469 self.pg_enable_capture(self.pg_interfaces)
2471 capture = self.pg1.get_capture(len(pkts))
2472 self.verify_capture_out(capture)
2473 self.nat44_add_address(self.nat_addr, is_add=0)
2474 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2475 capture = self.pg3.get_capture(9)
2476 ipfix = IPFIXDecoder()
2477 # first load template
2479 self.assertTrue(p.haslayer(IPFIX))
2480 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2481 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2482 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2483 self.assertEqual(p[UDP].dport, colector_port)
2484 self.assertEqual(p[IPFIX].observationDomainID,
2485 self.ipfix_domain_id)
2486 if p.haslayer(Template):
2487 ipfix.add_template(p.getlayer(Template))
2488 # verify events in data set
2490 if p.haslayer(Data):
2491 data = ipfix.decode_data_set(p.getlayer(Set))
2492 self.verify_ipfix_nat44_ses(data)
2494 def test_ipfix_addr_exhausted(self):
2495 """ IPFIX logging NAT addresses exhausted """
2496 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2497 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2499 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2500 src_address=self.pg3.local_ip4n,
2502 template_interval=10)
2503 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2504 src_port=self.ipfix_src_port)
2506 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2507 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2509 self.pg0.add_stream(p)
2510 self.pg_enable_capture(self.pg_interfaces)
2512 capture = self.pg1.get_capture(0)
2513 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2514 capture = self.pg3.get_capture(9)
2515 ipfix = IPFIXDecoder()
2516 # first load template
2518 self.assertTrue(p.haslayer(IPFIX))
2519 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2520 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2521 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2522 self.assertEqual(p[UDP].dport, 4739)
2523 self.assertEqual(p[IPFIX].observationDomainID,
2524 self.ipfix_domain_id)
2525 if p.haslayer(Template):
2526 ipfix.add_template(p.getlayer(Template))
2527 # verify events in data set
2529 if p.haslayer(Data):
2530 data = ipfix.decode_data_set(p.getlayer(Set))
2531 self.verify_ipfix_addr_exhausted(data)
2533 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2534 def test_ipfix_max_sessions(self):
2535 """ IPFIX logging maximum session entries exceeded """
2536 self.nat44_add_address(self.nat_addr)
2537 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2538 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2541 nat44_config = self.vapi.nat_show_config()
2542 max_sessions = 10 * nat44_config.translation_buckets
2545 for i in range(0, max_sessions):
2546 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2547 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2548 IP(src=src, dst=self.pg1.remote_ip4) /
2551 self.pg0.add_stream(pkts)
2552 self.pg_enable_capture(self.pg_interfaces)
2555 self.pg1.get_capture(max_sessions)
2556 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2557 src_address=self.pg3.local_ip4n,
2559 template_interval=10)
2560 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2561 src_port=self.ipfix_src_port)
2563 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2564 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2566 self.pg0.add_stream(p)
2567 self.pg_enable_capture(self.pg_interfaces)
2569 self.pg1.get_capture(0)
2570 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2571 capture = self.pg3.get_capture(9)
2572 ipfix = IPFIXDecoder()
2573 # first load template
2575 self.assertTrue(p.haslayer(IPFIX))
2576 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2577 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2578 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2579 self.assertEqual(p[UDP].dport, 4739)
2580 self.assertEqual(p[IPFIX].observationDomainID,
2581 self.ipfix_domain_id)
2582 if p.haslayer(Template):
2583 ipfix.add_template(p.getlayer(Template))
2584 # verify events in data set
2586 if p.haslayer(Data):
2587 data = ipfix.decode_data_set(p.getlayer(Set))
2588 self.verify_ipfix_max_sessions(data, max_sessions)
2590 def test_pool_addr_fib(self):
2591 """ NAT44 add pool addresses to FIB """
2592 static_addr = '10.0.0.10'
2593 self.nat44_add_address(self.nat_addr)
2594 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2595 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2597 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2600 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2601 ARP(op=ARP.who_has, pdst=self.nat_addr,
2602 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2603 self.pg1.add_stream(p)
2604 self.pg_enable_capture(self.pg_interfaces)
2606 capture = self.pg1.get_capture(1)
2607 self.assertTrue(capture[0].haslayer(ARP))
2608 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2611 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2612 ARP(op=ARP.who_has, pdst=static_addr,
2613 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2614 self.pg1.add_stream(p)
2615 self.pg_enable_capture(self.pg_interfaces)
2617 capture = self.pg1.get_capture(1)
2618 self.assertTrue(capture[0].haslayer(ARP))
2619 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2621 # send ARP to non-NAT44 interface
2622 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2623 ARP(op=ARP.who_has, pdst=self.nat_addr,
2624 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2625 self.pg2.add_stream(p)
2626 self.pg_enable_capture(self.pg_interfaces)
2628 capture = self.pg1.get_capture(0)
2630 # remove addresses and verify
2631 self.nat44_add_address(self.nat_addr, is_add=0)
2632 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2635 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2636 ARP(op=ARP.who_has, pdst=self.nat_addr,
2637 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2638 self.pg1.add_stream(p)
2639 self.pg_enable_capture(self.pg_interfaces)
2641 capture = self.pg1.get_capture(0)
2643 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2644 ARP(op=ARP.who_has, pdst=static_addr,
2645 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2646 self.pg1.add_stream(p)
2647 self.pg_enable_capture(self.pg_interfaces)
2649 capture = self.pg1.get_capture(0)
2651 def test_vrf_mode(self):
2652 """ NAT44 tenant VRF aware address pool mode """
2656 nat_ip1 = "10.0.0.10"
2657 nat_ip2 = "10.0.0.11"
2659 self.pg0.unconfig_ip4()
2660 self.pg1.unconfig_ip4()
2661 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2662 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2663 self.pg0.set_table_ip4(vrf_id1)
2664 self.pg1.set_table_ip4(vrf_id2)
2665 self.pg0.config_ip4()
2666 self.pg1.config_ip4()
2668 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2669 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2670 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2671 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2672 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2676 pkts = self.create_stream_in(self.pg0, self.pg2)
2677 self.pg0.add_stream(pkts)
2678 self.pg_enable_capture(self.pg_interfaces)
2680 capture = self.pg2.get_capture(len(pkts))
2681 self.verify_capture_out(capture, nat_ip1)
2684 pkts = self.create_stream_in(self.pg1, self.pg2)
2685 self.pg1.add_stream(pkts)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg2.get_capture(len(pkts))
2689 self.verify_capture_out(capture, nat_ip2)
2691 self.pg0.unconfig_ip4()
2692 self.pg1.unconfig_ip4()
2693 self.pg0.set_table_ip4(0)
2694 self.pg1.set_table_ip4(0)
2695 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2696 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2698 def test_vrf_feature_independent(self):
2699 """ NAT44 tenant VRF independent address pool mode """
2701 nat_ip1 = "10.0.0.10"
2702 nat_ip2 = "10.0.0.11"
2704 self.nat44_add_address(nat_ip1)
2705 self.nat44_add_address(nat_ip2, vrf_id=99)
2706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2708 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2712 pkts = self.create_stream_in(self.pg0, self.pg2)
2713 self.pg0.add_stream(pkts)
2714 self.pg_enable_capture(self.pg_interfaces)
2716 capture = self.pg2.get_capture(len(pkts))
2717 self.verify_capture_out(capture, nat_ip1)
2720 pkts = self.create_stream_in(self.pg1, self.pg2)
2721 self.pg1.add_stream(pkts)
2722 self.pg_enable_capture(self.pg_interfaces)
2724 capture = self.pg2.get_capture(len(pkts))
2725 self.verify_capture_out(capture, nat_ip1)
2727 def test_dynamic_ipless_interfaces(self):
2728 """ NAT44 interfaces without configured IP address """
2730 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2731 mactobinary(self.pg7.remote_mac),
2732 self.pg7.remote_ip4n,
2734 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2735 mactobinary(self.pg8.remote_mac),
2736 self.pg8.remote_ip4n,
2739 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2740 dst_address_length=32,
2741 next_hop_address=self.pg7.remote_ip4n,
2742 next_hop_sw_if_index=self.pg7.sw_if_index)
2743 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2744 dst_address_length=32,
2745 next_hop_address=self.pg8.remote_ip4n,
2746 next_hop_sw_if_index=self.pg8.sw_if_index)
2748 self.nat44_add_address(self.nat_addr)
2749 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2750 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2754 pkts = self.create_stream_in(self.pg7, self.pg8)
2755 self.pg7.add_stream(pkts)
2756 self.pg_enable_capture(self.pg_interfaces)
2758 capture = self.pg8.get_capture(len(pkts))
2759 self.verify_capture_out(capture)
2762 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2763 self.pg8.add_stream(pkts)
2764 self.pg_enable_capture(self.pg_interfaces)
2766 capture = self.pg7.get_capture(len(pkts))
2767 self.verify_capture_in(capture, self.pg7)
2769 def test_static_ipless_interfaces(self):
2770 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2772 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2773 mactobinary(self.pg7.remote_mac),
2774 self.pg7.remote_ip4n,
2776 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2777 mactobinary(self.pg8.remote_mac),
2778 self.pg8.remote_ip4n,
2781 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2782 dst_address_length=32,
2783 next_hop_address=self.pg7.remote_ip4n,
2784 next_hop_sw_if_index=self.pg7.sw_if_index)
2785 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2786 dst_address_length=32,
2787 next_hop_address=self.pg8.remote_ip4n,
2788 next_hop_sw_if_index=self.pg8.sw_if_index)
2790 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2791 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2792 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2796 pkts = self.create_stream_out(self.pg8)
2797 self.pg8.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg7.get_capture(len(pkts))
2801 self.verify_capture_in(capture, self.pg7)
2804 pkts = self.create_stream_in(self.pg7, self.pg8)
2805 self.pg7.add_stream(pkts)
2806 self.pg_enable_capture(self.pg_interfaces)
2808 capture = self.pg8.get_capture(len(pkts))
2809 self.verify_capture_out(capture, self.nat_addr, True)
2811 def test_static_with_port_ipless_interfaces(self):
2812 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2814 self.tcp_port_out = 30606
2815 self.udp_port_out = 30607
2816 self.icmp_id_out = 30608
2818 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2819 mactobinary(self.pg7.remote_mac),
2820 self.pg7.remote_ip4n,
2822 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2823 mactobinary(self.pg8.remote_mac),
2824 self.pg8.remote_ip4n,
2827 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2828 dst_address_length=32,
2829 next_hop_address=self.pg7.remote_ip4n,
2830 next_hop_sw_if_index=self.pg7.sw_if_index)
2831 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2832 dst_address_length=32,
2833 next_hop_address=self.pg8.remote_ip4n,
2834 next_hop_sw_if_index=self.pg8.sw_if_index)
2836 self.nat44_add_address(self.nat_addr)
2837 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2838 self.tcp_port_in, self.tcp_port_out,
2839 proto=IP_PROTOS.tcp)
2840 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2841 self.udp_port_in, self.udp_port_out,
2842 proto=IP_PROTOS.udp)
2843 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2844 self.icmp_id_in, self.icmp_id_out,
2845 proto=IP_PROTOS.icmp)
2846 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2847 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2851 pkts = self.create_stream_out(self.pg8)
2852 self.pg8.add_stream(pkts)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg7.get_capture(len(pkts))
2856 self.verify_capture_in(capture, self.pg7)
2859 pkts = self.create_stream_in(self.pg7, self.pg8)
2860 self.pg7.add_stream(pkts)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg8.get_capture(len(pkts))
2864 self.verify_capture_out(capture)
2866 def test_static_unknown_proto(self):
2867 """ 1:1 NAT translate packet with unknown protocol """
2868 nat_ip = "10.0.0.10"
2869 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2870 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2871 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2875 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2876 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2878 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2879 TCP(sport=1234, dport=1234))
2880 self.pg0.add_stream(p)
2881 self.pg_enable_capture(self.pg_interfaces)
2883 p = self.pg1.get_capture(1)
2886 self.assertEqual(packet[IP].src, nat_ip)
2887 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2888 self.assertTrue(packet.haslayer(GRE))
2889 self.check_ip_checksum(packet)
2891 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2895 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2896 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2898 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2899 TCP(sport=1234, dport=1234))
2900 self.pg1.add_stream(p)
2901 self.pg_enable_capture(self.pg_interfaces)
2903 p = self.pg0.get_capture(1)
2906 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2907 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2908 self.assertTrue(packet.haslayer(GRE))
2909 self.check_ip_checksum(packet)
2911 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2914 def test_hairpinning_static_unknown_proto(self):
2915 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2917 host = self.pg0.remote_hosts[0]
2918 server = self.pg0.remote_hosts[1]
2920 host_nat_ip = "10.0.0.10"
2921 server_nat_ip = "10.0.0.11"
2923 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2924 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2925 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2926 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2930 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2931 IP(src=host.ip4, dst=server_nat_ip) /
2933 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2934 TCP(sport=1234, dport=1234))
2935 self.pg0.add_stream(p)
2936 self.pg_enable_capture(self.pg_interfaces)
2938 p = self.pg0.get_capture(1)
2941 self.assertEqual(packet[IP].src, host_nat_ip)
2942 self.assertEqual(packet[IP].dst, server.ip4)
2943 self.assertTrue(packet.haslayer(GRE))
2944 self.check_ip_checksum(packet)
2946 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2950 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2951 IP(src=server.ip4, dst=host_nat_ip) /
2953 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2954 TCP(sport=1234, dport=1234))
2955 self.pg0.add_stream(p)
2956 self.pg_enable_capture(self.pg_interfaces)
2958 p = self.pg0.get_capture(1)
2961 self.assertEqual(packet[IP].src, server_nat_ip)
2962 self.assertEqual(packet[IP].dst, host.ip4)
2963 self.assertTrue(packet.haslayer(GRE))
2964 self.check_ip_checksum(packet)
2966 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2969 def test_unknown_proto(self):
2970 """ NAT44 translate packet with unknown protocol """
2971 self.nat44_add_address(self.nat_addr)
2972 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2973 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2977 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2978 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2979 TCP(sport=self.tcp_port_in, dport=20))
2980 self.pg0.add_stream(p)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 p = self.pg1.get_capture(1)
2985 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2986 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2988 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2989 TCP(sport=1234, dport=1234))
2990 self.pg0.add_stream(p)
2991 self.pg_enable_capture(self.pg_interfaces)
2993 p = self.pg1.get_capture(1)
2996 self.assertEqual(packet[IP].src, self.nat_addr)
2997 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2998 self.assertTrue(packet.haslayer(GRE))
2999 self.check_ip_checksum(packet)
3001 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3005 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3006 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3008 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3009 TCP(sport=1234, dport=1234))
3010 self.pg1.add_stream(p)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 p = self.pg0.get_capture(1)
3016 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3017 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3018 self.assertTrue(packet.haslayer(GRE))
3019 self.check_ip_checksum(packet)
3021 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3024 def test_hairpinning_unknown_proto(self):
3025 """ NAT44 translate packet with unknown protocol - hairpinning """
3026 host = self.pg0.remote_hosts[0]
3027 server = self.pg0.remote_hosts[1]
3030 server_in_port = 5678
3031 server_out_port = 8765
3032 server_nat_ip = "10.0.0.11"
3034 self.nat44_add_address(self.nat_addr)
3035 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3036 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3039 # add static mapping for server
3040 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3043 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3044 IP(src=host.ip4, dst=server_nat_ip) /
3045 TCP(sport=host_in_port, dport=server_out_port))
3046 self.pg0.add_stream(p)
3047 self.pg_enable_capture(self.pg_interfaces)
3049 capture = self.pg0.get_capture(1)
3051 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3052 IP(src=host.ip4, dst=server_nat_ip) /
3054 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3055 TCP(sport=1234, dport=1234))
3056 self.pg0.add_stream(p)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 p = self.pg0.get_capture(1)
3062 self.assertEqual(packet[IP].src, self.nat_addr)
3063 self.assertEqual(packet[IP].dst, server.ip4)
3064 self.assertTrue(packet.haslayer(GRE))
3065 self.check_ip_checksum(packet)
3067 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3071 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3072 IP(src=server.ip4, dst=self.nat_addr) /
3074 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3075 TCP(sport=1234, dport=1234))
3076 self.pg0.add_stream(p)
3077 self.pg_enable_capture(self.pg_interfaces)
3079 p = self.pg0.get_capture(1)
3082 self.assertEqual(packet[IP].src, server_nat_ip)
3083 self.assertEqual(packet[IP].dst, host.ip4)
3084 self.assertTrue(packet.haslayer(GRE))
3085 self.check_ip_checksum(packet)
3087 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3090 def test_output_feature(self):
3091 """ NAT44 interface output feature (in2out postrouting) """
3092 self.nat44_add_address(self.nat_addr)
3093 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3094 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3095 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3099 pkts = self.create_stream_in(self.pg0, self.pg3)
3100 self.pg0.add_stream(pkts)
3101 self.pg_enable_capture(self.pg_interfaces)
3103 capture = self.pg3.get_capture(len(pkts))
3104 self.verify_capture_out(capture)
3107 pkts = self.create_stream_out(self.pg3)
3108 self.pg3.add_stream(pkts)
3109 self.pg_enable_capture(self.pg_interfaces)
3111 capture = self.pg0.get_capture(len(pkts))
3112 self.verify_capture_in(capture, self.pg0)
3114 # from non-NAT interface to NAT inside interface
3115 pkts = self.create_stream_in(self.pg2, self.pg0)
3116 self.pg2.add_stream(pkts)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 capture = self.pg0.get_capture(len(pkts))
3120 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3122 def test_output_feature_vrf_aware(self):
3123 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3124 nat_ip_vrf10 = "10.0.0.10"
3125 nat_ip_vrf20 = "10.0.0.20"
3127 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3128 dst_address_length=32,
3129 next_hop_address=self.pg3.remote_ip4n,
3130 next_hop_sw_if_index=self.pg3.sw_if_index,
3132 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3133 dst_address_length=32,
3134 next_hop_address=self.pg3.remote_ip4n,
3135 next_hop_sw_if_index=self.pg3.sw_if_index,
3138 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3139 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3140 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3141 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3142 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3146 pkts = self.create_stream_in(self.pg4, self.pg3)
3147 self.pg4.add_stream(pkts)
3148 self.pg_enable_capture(self.pg_interfaces)
3150 capture = self.pg3.get_capture(len(pkts))
3151 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3154 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3155 self.pg3.add_stream(pkts)
3156 self.pg_enable_capture(self.pg_interfaces)
3158 capture = self.pg4.get_capture(len(pkts))
3159 self.verify_capture_in(capture, self.pg4)
3162 pkts = self.create_stream_in(self.pg6, self.pg3)
3163 self.pg6.add_stream(pkts)
3164 self.pg_enable_capture(self.pg_interfaces)
3166 capture = self.pg3.get_capture(len(pkts))
3167 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3170 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3171 self.pg3.add_stream(pkts)
3172 self.pg_enable_capture(self.pg_interfaces)
3174 capture = self.pg6.get_capture(len(pkts))
3175 self.verify_capture_in(capture, self.pg6)
3177 def test_output_feature_hairpinning(self):
3178 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3179 host = self.pg0.remote_hosts[0]
3180 server = self.pg0.remote_hosts[1]
3183 server_in_port = 5678
3184 server_out_port = 8765
3186 self.nat44_add_address(self.nat_addr)
3187 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3188 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3191 # add static mapping for server
3192 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3193 server_in_port, server_out_port,
3194 proto=IP_PROTOS.tcp)
3196 # send packet from host to server
3197 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3198 IP(src=host.ip4, dst=self.nat_addr) /
3199 TCP(sport=host_in_port, dport=server_out_port))
3200 self.pg0.add_stream(p)
3201 self.pg_enable_capture(self.pg_interfaces)
3203 capture = self.pg0.get_capture(1)
3208 self.assertEqual(ip.src, self.nat_addr)
3209 self.assertEqual(ip.dst, server.ip4)
3210 self.assertNotEqual(tcp.sport, host_in_port)
3211 self.assertEqual(tcp.dport, server_in_port)
3212 self.check_tcp_checksum(p)
3213 host_out_port = tcp.sport
3215 self.logger.error(ppp("Unexpected or invalid packet:", p))
3218 # send reply from server to host
3219 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3220 IP(src=server.ip4, dst=self.nat_addr) /
3221 TCP(sport=server_in_port, dport=host_out_port))
3222 self.pg0.add_stream(p)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture = self.pg0.get_capture(1)
3230 self.assertEqual(ip.src, self.nat_addr)
3231 self.assertEqual(ip.dst, host.ip4)
3232 self.assertEqual(tcp.sport, server_out_port)
3233 self.assertEqual(tcp.dport, host_in_port)
3234 self.check_tcp_checksum(p)
3236 self.logger.error(ppp("Unexpected or invalid packet:", p))
3239 def test_one_armed_nat44(self):
3240 """ One armed NAT44 """
3241 remote_host = self.pg9.remote_hosts[0]
3242 local_host = self.pg9.remote_hosts[1]
3245 self.nat44_add_address(self.nat_addr)
3246 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3247 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3251 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3252 IP(src=local_host.ip4, dst=remote_host.ip4) /
3253 TCP(sport=12345, dport=80))
3254 self.pg9.add_stream(p)
3255 self.pg_enable_capture(self.pg_interfaces)
3257 capture = self.pg9.get_capture(1)
3262 self.assertEqual(ip.src, self.nat_addr)
3263 self.assertEqual(ip.dst, remote_host.ip4)
3264 self.assertNotEqual(tcp.sport, 12345)
3265 external_port = tcp.sport
3266 self.assertEqual(tcp.dport, 80)
3267 self.check_tcp_checksum(p)
3268 self.check_ip_checksum(p)
3270 self.logger.error(ppp("Unexpected or invalid packet:", p))
3274 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3275 IP(src=remote_host.ip4, dst=self.nat_addr) /
3276 TCP(sport=80, dport=external_port))
3277 self.pg9.add_stream(p)
3278 self.pg_enable_capture(self.pg_interfaces)
3280 capture = self.pg9.get_capture(1)
3285 self.assertEqual(ip.src, remote_host.ip4)
3286 self.assertEqual(ip.dst, local_host.ip4)
3287 self.assertEqual(tcp.sport, 80)
3288 self.assertEqual(tcp.dport, 12345)
3289 self.check_tcp_checksum(p)
3290 self.check_ip_checksum(p)
3292 self.logger.error(ppp("Unexpected or invalid packet:", p))
3295 def test_one_armed_nat44_static(self):
3296 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3297 remote_host = self.pg9.remote_hosts[0]
3298 local_host = self.pg9.remote_hosts[1]
3303 self.vapi.nat44_forwarding_enable_disable(1)
3304 self.nat44_add_address(self.nat_addr, twice_nat=1)
3305 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3306 local_port, external_port,
3307 proto=IP_PROTOS.tcp, out2in_only=1,
3309 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3310 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3313 # from client to service
3314 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3315 IP(src=remote_host.ip4, dst=self.nat_addr) /
3316 TCP(sport=12345, dport=external_port))
3317 self.pg9.add_stream(p)
3318 self.pg_enable_capture(self.pg_interfaces)
3320 capture = self.pg9.get_capture(1)
3326 self.assertEqual(ip.dst, local_host.ip4)
3327 self.assertEqual(ip.src, self.nat_addr)
3328 self.assertEqual(tcp.dport, local_port)
3329 self.assertNotEqual(tcp.sport, 12345)
3330 eh_port_in = tcp.sport
3331 self.check_tcp_checksum(p)
3332 self.check_ip_checksum(p)
3334 self.logger.error(ppp("Unexpected or invalid packet:", p))
3337 # from service back to client
3338 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3339 IP(src=local_host.ip4, dst=self.nat_addr) /
3340 TCP(sport=local_port, dport=eh_port_in))
3341 self.pg9.add_stream(p)
3342 self.pg_enable_capture(self.pg_interfaces)
3344 capture = self.pg9.get_capture(1)
3349 self.assertEqual(ip.src, self.nat_addr)
3350 self.assertEqual(ip.dst, remote_host.ip4)
3351 self.assertEqual(tcp.sport, external_port)
3352 self.assertEqual(tcp.dport, 12345)
3353 self.check_tcp_checksum(p)
3354 self.check_ip_checksum(p)
3356 self.logger.error(ppp("Unexpected or invalid packet:", p))
3359 def test_del_session(self):
3360 """ Delete NAT44 session """
3361 self.nat44_add_address(self.nat_addr)
3362 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3363 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3366 pkts = self.create_stream_in(self.pg0, self.pg1)
3367 self.pg0.add_stream(pkts)
3368 self.pg_enable_capture(self.pg_interfaces)
3370 capture = self.pg1.get_capture(len(pkts))
3372 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3373 nsessions = len(sessions)
3375 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3376 sessions[0].inside_port,
3377 sessions[0].protocol)
3378 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3379 sessions[1].outside_port,
3380 sessions[1].protocol,
3383 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3384 self.assertEqual(nsessions - len(sessions), 2)
3386 def test_set_get_reass(self):
3387 """ NAT44 set/get virtual fragmentation reassembly """
3388 reas_cfg1 = self.vapi.nat_get_reass()
3390 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3391 max_reass=reas_cfg1.ip4_max_reass * 2,
3392 max_frag=reas_cfg1.ip4_max_frag * 2)
3394 reas_cfg2 = self.vapi.nat_get_reass()
3396 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3397 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3398 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3400 self.vapi.nat_set_reass(drop_frag=1)
3401 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3403 def test_frag_in_order(self):
3404 """ NAT44 translate fragments arriving in order """
3405 self.nat44_add_address(self.nat_addr)
3406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3410 data = "A" * 4 + "B" * 16 + "C" * 3
3411 self.tcp_port_in = random.randint(1025, 65535)
3413 reass = self.vapi.nat_reass_dump()
3414 reass_n_start = len(reass)
3417 pkts = self.create_stream_frag(self.pg0,
3418 self.pg1.remote_ip4,
3422 self.pg0.add_stream(pkts)
3423 self.pg_enable_capture(self.pg_interfaces)
3425 frags = self.pg1.get_capture(len(pkts))
3426 p = self.reass_frags_and_verify(frags,
3428 self.pg1.remote_ip4)
3429 self.assertEqual(p[TCP].dport, 20)
3430 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3431 self.tcp_port_out = p[TCP].sport
3432 self.assertEqual(data, p[Raw].load)
3435 pkts = self.create_stream_frag(self.pg1,
3440 self.pg1.add_stream(pkts)
3441 self.pg_enable_capture(self.pg_interfaces)
3443 frags = self.pg0.get_capture(len(pkts))
3444 p = self.reass_frags_and_verify(frags,
3445 self.pg1.remote_ip4,
3446 self.pg0.remote_ip4)
3447 self.assertEqual(p[TCP].sport, 20)
3448 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3449 self.assertEqual(data, p[Raw].load)
3451 reass = self.vapi.nat_reass_dump()
3452 reass_n_end = len(reass)
3454 self.assertEqual(reass_n_end - reass_n_start, 2)
3456 def test_reass_hairpinning(self):
3457 """ NAT44 fragments hairpinning """
3458 host = self.pg0.remote_hosts[0]
3459 server = self.pg0.remote_hosts[1]
3460 host_in_port = random.randint(1025, 65535)
3462 server_in_port = random.randint(1025, 65535)
3463 server_out_port = random.randint(1025, 65535)
3464 data = "A" * 4 + "B" * 16 + "C" * 3
3466 self.nat44_add_address(self.nat_addr)
3467 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3468 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3470 # add static mapping for server
3471 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3472 server_in_port, server_out_port,
3473 proto=IP_PROTOS.tcp)
3475 # send packet from host to server
3476 pkts = self.create_stream_frag(self.pg0,
3481 self.pg0.add_stream(pkts)
3482 self.pg_enable_capture(self.pg_interfaces)
3484 frags = self.pg0.get_capture(len(pkts))
3485 p = self.reass_frags_and_verify(frags,
3488 self.assertNotEqual(p[TCP].sport, host_in_port)
3489 self.assertEqual(p[TCP].dport, server_in_port)
3490 self.assertEqual(data, p[Raw].load)
3492 def test_frag_out_of_order(self):
3493 """ NAT44 translate fragments arriving out of order """
3494 self.nat44_add_address(self.nat_addr)
3495 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3496 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3499 data = "A" * 4 + "B" * 16 + "C" * 3
3500 random.randint(1025, 65535)
3503 pkts = self.create_stream_frag(self.pg0,
3504 self.pg1.remote_ip4,
3509 self.pg0.add_stream(pkts)
3510 self.pg_enable_capture(self.pg_interfaces)
3512 frags = self.pg1.get_capture(len(pkts))
3513 p = self.reass_frags_and_verify(frags,
3515 self.pg1.remote_ip4)
3516 self.assertEqual(p[TCP].dport, 20)
3517 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3518 self.tcp_port_out = p[TCP].sport
3519 self.assertEqual(data, p[Raw].load)
3522 pkts = self.create_stream_frag(self.pg1,
3528 self.pg1.add_stream(pkts)
3529 self.pg_enable_capture(self.pg_interfaces)
3531 frags = self.pg0.get_capture(len(pkts))
3532 p = self.reass_frags_and_verify(frags,
3533 self.pg1.remote_ip4,
3534 self.pg0.remote_ip4)
3535 self.assertEqual(p[TCP].sport, 20)
3536 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3537 self.assertEqual(data, p[Raw].load)
3539 def test_port_restricted(self):
3540 """ Port restricted NAT44 (MAP-E CE) """
3541 self.nat44_add_address(self.nat_addr)
3542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3545 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3546 "psid-offset 6 psid-len 6")
3548 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3549 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3550 TCP(sport=4567, dport=22))
3551 self.pg0.add_stream(p)
3552 self.pg_enable_capture(self.pg_interfaces)
3554 capture = self.pg1.get_capture(1)
3559 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3560 self.assertEqual(ip.src, self.nat_addr)
3561 self.assertEqual(tcp.dport, 22)
3562 self.assertNotEqual(tcp.sport, 4567)
3563 self.assertEqual((tcp.sport >> 6) & 63, 10)
3564 self.check_tcp_checksum(p)
3565 self.check_ip_checksum(p)
3567 self.logger.error(ppp("Unexpected or invalid packet:", p))
3570 def test_twice_nat(self):
3572 twice_nat_addr = '10.0.1.3'
3577 self.nat44_add_address(self.nat_addr)
3578 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3579 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3580 port_in, port_out, proto=IP_PROTOS.tcp,
3582 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3583 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3586 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3587 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3588 TCP(sport=eh_port_out, dport=port_out))
3589 self.pg1.add_stream(p)
3590 self.pg_enable_capture(self.pg_interfaces)
3592 capture = self.pg0.get_capture(1)
3597 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3598 self.assertEqual(ip.src, twice_nat_addr)
3599 self.assertEqual(tcp.dport, port_in)
3600 self.assertNotEqual(tcp.sport, eh_port_out)
3601 eh_port_in = tcp.sport
3602 self.check_tcp_checksum(p)
3603 self.check_ip_checksum(p)
3605 self.logger.error(ppp("Unexpected or invalid packet:", p))
3608 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3609 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3610 TCP(sport=port_in, dport=eh_port_in))
3611 self.pg0.add_stream(p)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 capture = self.pg1.get_capture(1)
3619 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3620 self.assertEqual(ip.src, self.nat_addr)
3621 self.assertEqual(tcp.dport, eh_port_out)
3622 self.assertEqual(tcp.sport, port_out)
3623 self.check_tcp_checksum(p)
3624 self.check_ip_checksum(p)
3626 self.logger.error(ppp("Unexpected or invalid packet:", p))
3629 def test_twice_nat_lb(self):
3630 """ Twice NAT44 local service load balancing """
3631 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3632 twice_nat_addr = '10.0.1.3'
3637 server1 = self.pg0.remote_hosts[0]
3638 server2 = self.pg0.remote_hosts[1]
3640 locals = [{'addr': server1.ip4n,
3643 {'addr': server2.ip4n,
3647 self.nat44_add_address(self.nat_addr)
3648 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3650 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3654 local_num=len(locals),
3656 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3657 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3660 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3661 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3662 TCP(sport=eh_port_out, dport=external_port))
3663 self.pg1.add_stream(p)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 capture = self.pg0.get_capture(1)
3672 self.assertEqual(ip.src, twice_nat_addr)
3673 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3674 if ip.dst == server1.ip4:
3678 self.assertNotEqual(tcp.sport, eh_port_out)
3679 eh_port_in = tcp.sport
3680 self.assertEqual(tcp.dport, local_port)
3681 self.check_tcp_checksum(p)
3682 self.check_ip_checksum(p)
3684 self.logger.error(ppp("Unexpected or invalid packet:", p))
3687 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3688 IP(src=server.ip4, dst=twice_nat_addr) /
3689 TCP(sport=local_port, dport=eh_port_in))
3690 self.pg0.add_stream(p)
3691 self.pg_enable_capture(self.pg_interfaces)
3693 capture = self.pg1.get_capture(1)
3698 self.assertEqual(ip.src, self.nat_addr)
3699 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3700 self.assertEqual(tcp.sport, external_port)
3701 self.assertEqual(tcp.dport, eh_port_out)
3702 self.check_tcp_checksum(p)
3703 self.check_ip_checksum(p)
3705 self.logger.error(ppp("Unexpected or invalid packet:", p))
3708 def test_twice_nat_interface_addr(self):
3709 """ Acquire twice NAT44 addresses from interface """
3710 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3712 # no address in NAT pool
3713 adresses = self.vapi.nat44_address_dump()
3714 self.assertEqual(0, len(adresses))
3716 # configure interface address and check NAT address pool
3717 self.pg7.config_ip4()
3718 adresses = self.vapi.nat44_address_dump()
3719 self.assertEqual(1, len(adresses))
3720 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3721 self.assertEqual(adresses[0].twice_nat, 1)
3723 # remove interface address and check NAT address pool
3724 self.pg7.unconfig_ip4()
3725 adresses = self.vapi.nat44_address_dump()
3726 self.assertEqual(0, len(adresses))
3728 def test_ipfix_max_frags(self):
3729 """ IPFIX logging maximum fragments pending reassembly exceeded """
3730 self.nat44_add_address(self.nat_addr)
3731 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3732 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3734 self.vapi.nat_set_reass(max_frag=0)
3735 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3736 src_address=self.pg3.local_ip4n,
3738 template_interval=10)
3739 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3740 src_port=self.ipfix_src_port)
3742 data = "A" * 4 + "B" * 16 + "C" * 3
3743 self.tcp_port_in = random.randint(1025, 65535)
3744 pkts = self.create_stream_frag(self.pg0,
3745 self.pg1.remote_ip4,
3749 self.pg0.add_stream(pkts[-1])
3750 self.pg_enable_capture(self.pg_interfaces)
3752 frags = self.pg1.get_capture(0)
3753 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3754 capture = self.pg3.get_capture(9)
3755 ipfix = IPFIXDecoder()
3756 # first load template
3758 self.assertTrue(p.haslayer(IPFIX))
3759 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3760 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3761 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3762 self.assertEqual(p[UDP].dport, 4739)
3763 self.assertEqual(p[IPFIX].observationDomainID,
3764 self.ipfix_domain_id)
3765 if p.haslayer(Template):
3766 ipfix.add_template(p.getlayer(Template))
3767 # verify events in data set
3769 if p.haslayer(Data):
3770 data = ipfix.decode_data_set(p.getlayer(Set))
3771 self.verify_ipfix_max_fragments_ip4(data, 0,
3772 self.pg0.remote_ip4n)
3775 super(TestNAT44, self).tearDown()
3776 if not self.vpp_dead:
3777 self.logger.info(self.vapi.cli("show nat44 addresses"))
3778 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3779 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3780 self.logger.info(self.vapi.cli("show nat44 interface address"))
3781 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3782 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3783 self.vapi.cli("nat addr-port-assignment-alg default")
3787 class TestNAT44Out2InDPO(MethodHolder):
3788 """ NAT44 Test Cases using out2in DPO """
3791 def setUpConstants(cls):
3792 super(TestNAT44Out2InDPO, cls).setUpConstants()
3793 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3796 def setUpClass(cls):
3797 super(TestNAT44Out2InDPO, cls).setUpClass()
3800 cls.tcp_port_in = 6303
3801 cls.tcp_port_out = 6303
3802 cls.udp_port_in = 6304
3803 cls.udp_port_out = 6304
3804 cls.icmp_id_in = 6305
3805 cls.icmp_id_out = 6305
3806 cls.nat_addr = '10.0.0.3'
3807 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3808 cls.dst_ip4 = '192.168.70.1'
3810 cls.create_pg_interfaces(range(2))
3813 cls.pg0.config_ip4()
3814 cls.pg0.resolve_arp()
3817 cls.pg1.config_ip6()
3818 cls.pg1.resolve_ndp()
3820 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3821 dst_address_length=0,
3822 next_hop_address=cls.pg1.remote_ip6n,
3823 next_hop_sw_if_index=cls.pg1.sw_if_index)
3826 super(TestNAT44Out2InDPO, cls).tearDownClass()
3829 def configure_xlat(self):
3830 self.dst_ip6_pfx = '1:2:3::'
3831 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3833 self.dst_ip6_pfx_len = 96
3834 self.src_ip6_pfx = '4:5:6::'
3835 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3837 self.src_ip6_pfx_len = 96
3838 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3839 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3840 '\x00\x00\x00\x00', 0, is_translation=1,
3843 def test_464xlat_ce(self):
3844 """ Test 464XLAT CE with NAT44 """
3846 self.configure_xlat()
3848 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3849 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3851 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3852 self.dst_ip6_pfx_len)
3853 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3854 self.src_ip6_pfx_len)
3857 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3858 self.pg0.add_stream(pkts)
3859 self.pg_enable_capture(self.pg_interfaces)
3861 capture = self.pg1.get_capture(len(pkts))
3862 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3865 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3867 self.pg1.add_stream(pkts)
3868 self.pg_enable_capture(self.pg_interfaces)
3870 capture = self.pg0.get_capture(len(pkts))
3871 self.verify_capture_in(capture, self.pg0)
3873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3875 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3876 self.nat_addr_n, is_add=0)
3878 def test_464xlat_ce_no_nat(self):
3879 """ Test 464XLAT CE without NAT44 """
3881 self.configure_xlat()
3883 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3884 self.dst_ip6_pfx_len)
3885 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3886 self.src_ip6_pfx_len)
3888 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3889 self.pg0.add_stream(pkts)
3890 self.pg_enable_capture(self.pg_interfaces)
3892 capture = self.pg1.get_capture(len(pkts))
3893 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3894 nat_ip=out_dst_ip6, same_port=True)
3896 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3897 self.pg1.add_stream(pkts)
3898 self.pg_enable_capture(self.pg_interfaces)
3900 capture = self.pg0.get_capture(len(pkts))
3901 self.verify_capture_in(capture, self.pg0)
3904 class TestDeterministicNAT(MethodHolder):
3905 """ Deterministic NAT Test Cases """
3908 def setUpConstants(cls):
3909 super(TestDeterministicNAT, cls).setUpConstants()
3910 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3913 def setUpClass(cls):
3914 super(TestDeterministicNAT, cls).setUpClass()
3917 cls.tcp_port_in = 6303
3918 cls.tcp_external_port = 6303
3919 cls.udp_port_in = 6304
3920 cls.udp_external_port = 6304
3921 cls.icmp_id_in = 6305
3922 cls.nat_addr = '10.0.0.3'
3924 cls.create_pg_interfaces(range(3))
3925 cls.interfaces = list(cls.pg_interfaces)
3927 for i in cls.interfaces:
3932 cls.pg0.generate_remote_hosts(2)
3933 cls.pg0.configure_ipv4_neighbors()
3936 super(TestDeterministicNAT, cls).tearDownClass()
3939 def create_stream_in(self, in_if, out_if, ttl=64):
3941 Create packet stream for inside network
3943 :param in_if: Inside interface
3944 :param out_if: Outside interface
3945 :param ttl: TTL of generated packets
3949 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3950 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3951 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3955 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3956 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3957 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3961 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3962 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3963 ICMP(id=self.icmp_id_in, type='echo-request'))
3968 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3970 Create packet stream for outside network
3972 :param out_if: Outside interface
3973 :param dst_ip: Destination IP address (Default use global NAT address)
3974 :param ttl: TTL of generated packets
3977 dst_ip = self.nat_addr
3980 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3981 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3982 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3986 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3987 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3988 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3992 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3993 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3994 ICMP(id=self.icmp_external_id, type='echo-reply'))
3999 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4001 Verify captured packets on outside network
4003 :param capture: Captured packets
4004 :param nat_ip: Translated IP address (Default use global NAT address)
4005 :param same_port: Sorce port number is not translated (Default False)
4006 :param packet_num: Expected number of packets (Default 3)
4009 nat_ip = self.nat_addr
4010 self.assertEqual(packet_num, len(capture))
4011 for packet in capture:
4013 self.assertEqual(packet[IP].src, nat_ip)
4014 if packet.haslayer(TCP):
4015 self.tcp_port_out = packet[TCP].sport
4016 elif packet.haslayer(UDP):
4017 self.udp_port_out = packet[UDP].sport
4019 self.icmp_external_id = packet[ICMP].id
4021 self.logger.error(ppp("Unexpected or invalid packet "
4022 "(outside network):", packet))
4025 def initiate_tcp_session(self, in_if, out_if):
4027 Initiates TCP session
4029 :param in_if: Inside interface
4030 :param out_if: Outside interface
4033 # SYN packet in->out
4034 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4035 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4036 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4039 self.pg_enable_capture(self.pg_interfaces)
4041 capture = out_if.get_capture(1)
4043 self.tcp_port_out = p[TCP].sport
4045 # SYN + ACK packet out->in
4046 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4047 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4048 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4050 out_if.add_stream(p)
4051 self.pg_enable_capture(self.pg_interfaces)
4053 in_if.get_capture(1)
4055 # ACK packet in->out
4056 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4057 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4058 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4061 self.pg_enable_capture(self.pg_interfaces)
4063 out_if.get_capture(1)
4066 self.logger.error("TCP 3 way handshake failed")
4069 def verify_ipfix_max_entries_per_user(self, data):
4071 Verify IPFIX maximum entries per user exceeded event
4073 :param data: Decoded IPFIX data records
4075 self.assertEqual(1, len(data))
4078 self.assertEqual(ord(record[230]), 13)
4079 # natQuotaExceededEvent
4080 self.assertEqual('\x03\x00\x00\x00', record[466])
4082 self.assertEqual('\xe8\x03\x00\x00', record[473])
4084 self.assertEqual(self.pg0.remote_ip4n, record[8])
4086 def test_deterministic_mode(self):
4087 """ NAT plugin run deterministic mode """
4088 in_addr = '172.16.255.0'
4089 out_addr = '172.17.255.50'
4090 in_addr_t = '172.16.255.20'
4091 in_addr_n = socket.inet_aton(in_addr)
4092 out_addr_n = socket.inet_aton(out_addr)
4093 in_addr_t_n = socket.inet_aton(in_addr_t)
4097 nat_config = self.vapi.nat_show_config()
4098 self.assertEqual(1, nat_config.deterministic)
4100 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4102 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4103 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4104 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4105 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4107 deterministic_mappings = self.vapi.nat_det_map_dump()
4108 self.assertEqual(len(deterministic_mappings), 1)
4109 dsm = deterministic_mappings[0]
4110 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4111 self.assertEqual(in_plen, dsm.in_plen)
4112 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4113 self.assertEqual(out_plen, dsm.out_plen)
4115 self.clear_nat_det()
4116 deterministic_mappings = self.vapi.nat_det_map_dump()
4117 self.assertEqual(len(deterministic_mappings), 0)
4119 def test_set_timeouts(self):
4120 """ Set deterministic NAT timeouts """
4121 timeouts_before = self.vapi.nat_det_get_timeouts()
4123 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4124 timeouts_before.tcp_established + 10,
4125 timeouts_before.tcp_transitory + 10,
4126 timeouts_before.icmp + 10)
4128 timeouts_after = self.vapi.nat_det_get_timeouts()
4130 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4131 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4132 self.assertNotEqual(timeouts_before.tcp_established,
4133 timeouts_after.tcp_established)
4134 self.assertNotEqual(timeouts_before.tcp_transitory,
4135 timeouts_after.tcp_transitory)
4137 def test_det_in(self):
4138 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4140 nat_ip = "10.0.0.10"
4142 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4144 socket.inet_aton(nat_ip),
4146 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4147 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4151 pkts = self.create_stream_in(self.pg0, self.pg1)
4152 self.pg0.add_stream(pkts)
4153 self.pg_enable_capture(self.pg_interfaces)
4155 capture = self.pg1.get_capture(len(pkts))
4156 self.verify_capture_out(capture, nat_ip)
4159 pkts = self.create_stream_out(self.pg1, nat_ip)
4160 self.pg1.add_stream(pkts)
4161 self.pg_enable_capture(self.pg_interfaces)
4163 capture = self.pg0.get_capture(len(pkts))
4164 self.verify_capture_in(capture, self.pg0)
4167 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4168 self.assertEqual(len(sessions), 3)
4172 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4173 self.assertEqual(s.in_port, self.tcp_port_in)
4174 self.assertEqual(s.out_port, self.tcp_port_out)
4175 self.assertEqual(s.ext_port, self.tcp_external_port)
4179 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4180 self.assertEqual(s.in_port, self.udp_port_in)
4181 self.assertEqual(s.out_port, self.udp_port_out)
4182 self.assertEqual(s.ext_port, self.udp_external_port)
4186 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4187 self.assertEqual(s.in_port, self.icmp_id_in)
4188 self.assertEqual(s.out_port, self.icmp_external_id)
4190 def test_multiple_users(self):
4191 """ Deterministic NAT multiple users """
4193 nat_ip = "10.0.0.10"
4195 external_port = 6303
4197 host0 = self.pg0.remote_hosts[0]
4198 host1 = self.pg0.remote_hosts[1]
4200 self.vapi.nat_det_add_del_map(host0.ip4n,
4202 socket.inet_aton(nat_ip),
4204 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4205 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4209 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4210 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4211 TCP(sport=port_in, dport=external_port))
4212 self.pg0.add_stream(p)
4213 self.pg_enable_capture(self.pg_interfaces)
4215 capture = self.pg1.get_capture(1)
4220 self.assertEqual(ip.src, nat_ip)
4221 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4222 self.assertEqual(tcp.dport, external_port)
4223 port_out0 = tcp.sport
4225 self.logger.error(ppp("Unexpected or invalid packet:", p))
4229 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4230 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4231 TCP(sport=port_in, dport=external_port))
4232 self.pg0.add_stream(p)
4233 self.pg_enable_capture(self.pg_interfaces)
4235 capture = self.pg1.get_capture(1)
4240 self.assertEqual(ip.src, nat_ip)
4241 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4242 self.assertEqual(tcp.dport, external_port)
4243 port_out1 = tcp.sport
4245 self.logger.error(ppp("Unexpected or invalid packet:", p))
4248 dms = self.vapi.nat_det_map_dump()
4249 self.assertEqual(1, len(dms))
4250 self.assertEqual(2, dms[0].ses_num)
4253 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4254 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4255 TCP(sport=external_port, dport=port_out0))
4256 self.pg1.add_stream(p)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 capture = self.pg0.get_capture(1)
4264 self.assertEqual(ip.src, self.pg1.remote_ip4)
4265 self.assertEqual(ip.dst, host0.ip4)
4266 self.assertEqual(tcp.dport, port_in)
4267 self.assertEqual(tcp.sport, external_port)
4269 self.logger.error(ppp("Unexpected or invalid packet:", p))
4273 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4274 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4275 TCP(sport=external_port, dport=port_out1))
4276 self.pg1.add_stream(p)
4277 self.pg_enable_capture(self.pg_interfaces)
4279 capture = self.pg0.get_capture(1)
4284 self.assertEqual(ip.src, self.pg1.remote_ip4)
4285 self.assertEqual(ip.dst, host1.ip4)
4286 self.assertEqual(tcp.dport, port_in)
4287 self.assertEqual(tcp.sport, external_port)
4289 self.logger.error(ppp("Unexpected or invalid packet", p))
4292 # session close api test
4293 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4295 self.pg1.remote_ip4n,
4297 dms = self.vapi.nat_det_map_dump()
4298 self.assertEqual(dms[0].ses_num, 1)
4300 self.vapi.nat_det_close_session_in(host0.ip4n,
4302 self.pg1.remote_ip4n,
4304 dms = self.vapi.nat_det_map_dump()
4305 self.assertEqual(dms[0].ses_num, 0)
4307 def test_tcp_session_close_detection_in(self):
4308 """ Deterministic NAT TCP session close from inside network """
4309 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4311 socket.inet_aton(self.nat_addr),
4313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4317 self.initiate_tcp_session(self.pg0, self.pg1)
4319 # close the session from inside
4321 # FIN packet in -> out
4322 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4323 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4324 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4326 self.pg0.add_stream(p)
4327 self.pg_enable_capture(self.pg_interfaces)
4329 self.pg1.get_capture(1)
4333 # ACK packet out -> in
4334 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4335 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4336 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4340 # FIN packet out -> in
4341 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4342 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4343 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4347 self.pg1.add_stream(pkts)
4348 self.pg_enable_capture(self.pg_interfaces)
4350 self.pg0.get_capture(2)
4352 # ACK packet in -> out
4353 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4354 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4355 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4357 self.pg0.add_stream(p)
4358 self.pg_enable_capture(self.pg_interfaces)
4360 self.pg1.get_capture(1)
4362 # Check if deterministic NAT44 closed the session
4363 dms = self.vapi.nat_det_map_dump()
4364 self.assertEqual(0, dms[0].ses_num)
4366 self.logger.error("TCP session termination failed")
4369 def test_tcp_session_close_detection_out(self):
4370 """ Deterministic NAT TCP session close from outside network """
4371 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4373 socket.inet_aton(self.nat_addr),
4375 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4376 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4379 self.initiate_tcp_session(self.pg0, self.pg1)
4381 # close the session from outside
4383 # FIN packet out -> in
4384 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4385 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4386 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4388 self.pg1.add_stream(p)
4389 self.pg_enable_capture(self.pg_interfaces)
4391 self.pg0.get_capture(1)
4395 # ACK packet in -> out
4396 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4397 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4398 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4402 # ACK packet in -> out
4403 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4404 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4405 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4409 self.pg0.add_stream(pkts)
4410 self.pg_enable_capture(self.pg_interfaces)
4412 self.pg1.get_capture(2)
4414 # ACK packet out -> in
4415 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4416 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4417 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4419 self.pg1.add_stream(p)
4420 self.pg_enable_capture(self.pg_interfaces)
4422 self.pg0.get_capture(1)
4424 # Check if deterministic NAT44 closed the session
4425 dms = self.vapi.nat_det_map_dump()
4426 self.assertEqual(0, dms[0].ses_num)
4428 self.logger.error("TCP session termination failed")
4431 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4432 def test_session_timeout(self):
4433 """ Deterministic NAT session timeouts """
4434 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4436 socket.inet_aton(self.nat_addr),
4438 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4439 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4442 self.initiate_tcp_session(self.pg0, self.pg1)
4443 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4444 pkts = self.create_stream_in(self.pg0, self.pg1)
4445 self.pg0.add_stream(pkts)
4446 self.pg_enable_capture(self.pg_interfaces)
4448 capture = self.pg1.get_capture(len(pkts))
4451 dms = self.vapi.nat_det_map_dump()
4452 self.assertEqual(0, dms[0].ses_num)
4454 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4455 def test_session_limit_per_user(self):
4456 """ Deterministic NAT maximum sessions per user limit """
4457 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4459 socket.inet_aton(self.nat_addr),
4461 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4462 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4464 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4465 src_address=self.pg2.local_ip4n,
4467 template_interval=10)
4468 self.vapi.nat_ipfix()
4471 for port in range(1025, 2025):
4472 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4473 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4474 UDP(sport=port, dport=port))
4477 self.pg0.add_stream(pkts)
4478 self.pg_enable_capture(self.pg_interfaces)
4480 capture = self.pg1.get_capture(len(pkts))
4482 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4483 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4484 UDP(sport=3001, dport=3002))
4485 self.pg0.add_stream(p)
4486 self.pg_enable_capture(self.pg_interfaces)
4488 capture = self.pg1.assert_nothing_captured()
4490 # verify ICMP error packet
4491 capture = self.pg0.get_capture(1)
4493 self.assertTrue(p.haslayer(ICMP))
4495 self.assertEqual(icmp.type, 3)
4496 self.assertEqual(icmp.code, 1)
4497 self.assertTrue(icmp.haslayer(IPerror))
4498 inner_ip = icmp[IPerror]
4499 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4500 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4502 dms = self.vapi.nat_det_map_dump()
4504 self.assertEqual(1000, dms[0].ses_num)
4506 # verify IPFIX logging
4507 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4509 capture = self.pg2.get_capture(2)
4510 ipfix = IPFIXDecoder()
4511 # first load template
4513 self.assertTrue(p.haslayer(IPFIX))
4514 if p.haslayer(Template):
4515 ipfix.add_template(p.getlayer(Template))
4516 # verify events in data set
4518 if p.haslayer(Data):
4519 data = ipfix.decode_data_set(p.getlayer(Set))
4520 self.verify_ipfix_max_entries_per_user(data)
4522 def clear_nat_det(self):
4524 Clear deterministic NAT configuration.
4526 self.vapi.nat_ipfix(enable=0)
4527 self.vapi.nat_det_set_timeouts()
4528 deterministic_mappings = self.vapi.nat_det_map_dump()
4529 for dsm in deterministic_mappings:
4530 self.vapi.nat_det_add_del_map(dsm.in_addr,
4536 interfaces = self.vapi.nat44_interface_dump()
4537 for intf in interfaces:
4538 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4543 super(TestDeterministicNAT, self).tearDown()
4544 if not self.vpp_dead:
4545 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4547 self.vapi.cli("show nat44 deterministic mappings"))
4549 self.vapi.cli("show nat44 deterministic timeouts"))
4551 self.vapi.cli("show nat44 deterministic sessions"))
4552 self.clear_nat_det()
4555 class TestNAT64(MethodHolder):
4556 """ NAT64 Test Cases """
4559 def setUpConstants(cls):
4560 super(TestNAT64, cls).setUpConstants()
4561 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4562 "nat64 st hash buckets 256", "}"])
4565 def setUpClass(cls):
4566 super(TestNAT64, cls).setUpClass()
4569 cls.tcp_port_in = 6303
4570 cls.tcp_port_out = 6303
4571 cls.udp_port_in = 6304
4572 cls.udp_port_out = 6304
4573 cls.icmp_id_in = 6305
4574 cls.icmp_id_out = 6305
4575 cls.nat_addr = '10.0.0.3'
4576 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4578 cls.vrf1_nat_addr = '10.0.10.3'
4579 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4581 cls.ipfix_src_port = 4739
4582 cls.ipfix_domain_id = 1
4584 cls.create_pg_interfaces(range(5))
4585 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4586 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4587 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4589 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4591 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4593 cls.pg0.generate_remote_hosts(2)
4595 for i in cls.ip6_interfaces:
4598 i.configure_ipv6_neighbors()
4600 for i in cls.ip4_interfaces:
4606 cls.pg3.config_ip4()
4607 cls.pg3.resolve_arp()
4608 cls.pg3.config_ip6()
4609 cls.pg3.configure_ipv6_neighbors()
4612 super(TestNAT64, cls).tearDownClass()
4615 def test_pool(self):
4616 """ Add/delete address to NAT64 pool """
4617 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4619 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4621 addresses = self.vapi.nat64_pool_addr_dump()
4622 self.assertEqual(len(addresses), 1)
4623 self.assertEqual(addresses[0].address, nat_addr)
4625 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4627 addresses = self.vapi.nat64_pool_addr_dump()
4628 self.assertEqual(len(addresses), 0)
4630 def test_interface(self):
4631 """ Enable/disable NAT64 feature on the interface """
4632 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4633 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4635 interfaces = self.vapi.nat64_interface_dump()
4636 self.assertEqual(len(interfaces), 2)
4639 for intf in interfaces:
4640 if intf.sw_if_index == self.pg0.sw_if_index:
4641 self.assertEqual(intf.is_inside, 1)
4643 elif intf.sw_if_index == self.pg1.sw_if_index:
4644 self.assertEqual(intf.is_inside, 0)
4646 self.assertTrue(pg0_found)
4647 self.assertTrue(pg1_found)
4649 features = self.vapi.cli("show interface features pg0")
4650 self.assertNotEqual(features.find('nat64-in2out'), -1)
4651 features = self.vapi.cli("show interface features pg1")
4652 self.assertNotEqual(features.find('nat64-out2in'), -1)
4654 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4655 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4657 interfaces = self.vapi.nat64_interface_dump()
4658 self.assertEqual(len(interfaces), 0)
4660 def test_static_bib(self):
4661 """ Add/delete static BIB entry """
4662 in_addr = socket.inet_pton(socket.AF_INET6,
4663 '2001:db8:85a3::8a2e:370:7334')
4664 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4667 proto = IP_PROTOS.tcp
4669 self.vapi.nat64_add_del_static_bib(in_addr,
4674 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4679 self.assertEqual(bibe.i_addr, in_addr)
4680 self.assertEqual(bibe.o_addr, out_addr)
4681 self.assertEqual(bibe.i_port, in_port)
4682 self.assertEqual(bibe.o_port, out_port)
4683 self.assertEqual(static_bib_num, 1)
4685 self.vapi.nat64_add_del_static_bib(in_addr,
4691 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4696 self.assertEqual(static_bib_num, 0)
4698 def test_set_timeouts(self):
4699 """ Set NAT64 timeouts """
4700 # verify default values
4701 timeouts = self.vapi.nat64_get_timeouts()
4702 self.assertEqual(timeouts.udp, 300)
4703 self.assertEqual(timeouts.icmp, 60)
4704 self.assertEqual(timeouts.tcp_trans, 240)
4705 self.assertEqual(timeouts.tcp_est, 7440)
4706 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4708 # set and verify custom values
4709 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4710 tcp_est=7450, tcp_incoming_syn=10)
4711 timeouts = self.vapi.nat64_get_timeouts()
4712 self.assertEqual(timeouts.udp, 200)
4713 self.assertEqual(timeouts.icmp, 30)
4714 self.assertEqual(timeouts.tcp_trans, 250)
4715 self.assertEqual(timeouts.tcp_est, 7450)
4716 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4718 def test_dynamic(self):
4719 """ NAT64 dynamic translation test """
4720 self.tcp_port_in = 6303
4721 self.udp_port_in = 6304
4722 self.icmp_id_in = 6305
4724 ses_num_start = self.nat64_get_ses_num()
4726 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4728 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4729 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4732 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4733 self.pg0.add_stream(pkts)
4734 self.pg_enable_capture(self.pg_interfaces)
4736 capture = self.pg1.get_capture(len(pkts))
4737 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4738 dst_ip=self.pg1.remote_ip4)
4741 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4742 self.pg1.add_stream(pkts)
4743 self.pg_enable_capture(self.pg_interfaces)
4745 capture = self.pg0.get_capture(len(pkts))
4746 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4747 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4750 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4751 self.pg0.add_stream(pkts)
4752 self.pg_enable_capture(self.pg_interfaces)
4754 capture = self.pg1.get_capture(len(pkts))
4755 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4756 dst_ip=self.pg1.remote_ip4)
4759 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4760 self.pg1.add_stream(pkts)
4761 self.pg_enable_capture(self.pg_interfaces)
4763 capture = self.pg0.get_capture(len(pkts))
4764 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4766 ses_num_end = self.nat64_get_ses_num()
4768 self.assertEqual(ses_num_end - ses_num_start, 3)
4770 # tenant with specific VRF
4771 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4772 self.vrf1_nat_addr_n,
4773 vrf_id=self.vrf1_id)
4774 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4776 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4777 self.pg2.add_stream(pkts)
4778 self.pg_enable_capture(self.pg_interfaces)
4780 capture = self.pg1.get_capture(len(pkts))
4781 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4782 dst_ip=self.pg1.remote_ip4)
4784 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4785 self.pg1.add_stream(pkts)
4786 self.pg_enable_capture(self.pg_interfaces)
4788 capture = self.pg2.get_capture(len(pkts))
4789 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4791 def test_static(self):
4792 """ NAT64 static translation test """
4793 self.tcp_port_in = 60303
4794 self.udp_port_in = 60304
4795 self.icmp_id_in = 60305
4796 self.tcp_port_out = 60303
4797 self.udp_port_out = 60304
4798 self.icmp_id_out = 60305
4800 ses_num_start = self.nat64_get_ses_num()
4802 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4804 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4805 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4807 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4812 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4817 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4824 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4825 self.pg0.add_stream(pkts)
4826 self.pg_enable_capture(self.pg_interfaces)
4828 capture = self.pg1.get_capture(len(pkts))
4829 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4830 dst_ip=self.pg1.remote_ip4, same_port=True)
4833 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4834 self.pg1.add_stream(pkts)
4835 self.pg_enable_capture(self.pg_interfaces)
4837 capture = self.pg0.get_capture(len(pkts))
4838 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4839 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4841 ses_num_end = self.nat64_get_ses_num()
4843 self.assertEqual(ses_num_end - ses_num_start, 3)
4845 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4846 def test_session_timeout(self):
4847 """ NAT64 session timeout """
4848 self.icmp_id_in = 1234
4849 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4851 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4852 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4853 self.vapi.nat64_set_timeouts(icmp=5)
4855 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4856 self.pg0.add_stream(pkts)
4857 self.pg_enable_capture(self.pg_interfaces)
4859 capture = self.pg1.get_capture(len(pkts))
4861 ses_num_before_timeout = self.nat64_get_ses_num()
4865 # ICMP session after timeout
4866 ses_num_after_timeout = self.nat64_get_ses_num()
4867 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4869 def test_icmp_error(self):
4870 """ NAT64 ICMP Error message translation """
4871 self.tcp_port_in = 6303
4872 self.udp_port_in = 6304
4873 self.icmp_id_in = 6305
4875 ses_num_start = self.nat64_get_ses_num()
4877 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4879 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4880 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4882 # send some packets to create sessions
4883 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4884 self.pg0.add_stream(pkts)
4885 self.pg_enable_capture(self.pg_interfaces)
4887 capture_ip4 = self.pg1.get_capture(len(pkts))
4888 self.verify_capture_out(capture_ip4,
4889 nat_ip=self.nat_addr,
4890 dst_ip=self.pg1.remote_ip4)
4892 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4893 self.pg1.add_stream(pkts)
4894 self.pg_enable_capture(self.pg_interfaces)
4896 capture_ip6 = self.pg0.get_capture(len(pkts))
4897 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4898 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4899 self.pg0.remote_ip6)
4902 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4903 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4904 ICMPv6DestUnreach(code=1) /
4905 packet[IPv6] for packet in capture_ip6]
4906 self.pg0.add_stream(pkts)
4907 self.pg_enable_capture(self.pg_interfaces)
4909 capture = self.pg1.get_capture(len(pkts))
4910 for packet in capture:
4912 self.assertEqual(packet[IP].src, self.nat_addr)
4913 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4914 self.assertEqual(packet[ICMP].type, 3)
4915 self.assertEqual(packet[ICMP].code, 13)
4916 inner = packet[IPerror]
4917 self.assertEqual(inner.src, self.pg1.remote_ip4)
4918 self.assertEqual(inner.dst, self.nat_addr)
4919 self.check_icmp_checksum(packet)
4920 if inner.haslayer(TCPerror):
4921 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4922 elif inner.haslayer(UDPerror):
4923 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4925 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4927 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4931 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4932 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4933 ICMP(type=3, code=13) /
4934 packet[IP] for packet in capture_ip4]
4935 self.pg1.add_stream(pkts)
4936 self.pg_enable_capture(self.pg_interfaces)
4938 capture = self.pg0.get_capture(len(pkts))
4939 for packet in capture:
4941 self.assertEqual(packet[IPv6].src, ip.src)
4942 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4943 icmp = packet[ICMPv6DestUnreach]
4944 self.assertEqual(icmp.code, 1)
4945 inner = icmp[IPerror6]
4946 self.assertEqual(inner.src, self.pg0.remote_ip6)
4947 self.assertEqual(inner.dst, ip.src)
4948 self.check_icmpv6_checksum(packet)
4949 if inner.haslayer(TCPerror):
4950 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4951 elif inner.haslayer(UDPerror):
4952 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4954 self.assertEqual(inner[ICMPv6EchoRequest].id,
4957 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4960 def test_hairpinning(self):
4961 """ NAT64 hairpinning """
4963 client = self.pg0.remote_hosts[0]
4964 server = self.pg0.remote_hosts[1]
4965 server_tcp_in_port = 22
4966 server_tcp_out_port = 4022
4967 server_udp_in_port = 23
4968 server_udp_out_port = 4023
4969 client_tcp_in_port = 1234
4970 client_udp_in_port = 1235
4971 client_tcp_out_port = 0
4972 client_udp_out_port = 0
4973 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4974 nat_addr_ip6 = ip.src
4976 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4978 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4979 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4981 self.vapi.nat64_add_del_static_bib(server.ip6n,
4984 server_tcp_out_port,
4986 self.vapi.nat64_add_del_static_bib(server.ip6n,
4989 server_udp_out_port,
4994 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4996 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4998 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4999 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5000 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5002 self.pg0.add_stream(pkts)
5003 self.pg_enable_capture(self.pg_interfaces)
5005 capture = self.pg0.get_capture(len(pkts))
5006 for packet in capture:
5008 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5009 self.assertEqual(packet[IPv6].dst, server.ip6)
5010 if packet.haslayer(TCP):
5011 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5012 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5013 self.check_tcp_checksum(packet)
5014 client_tcp_out_port = packet[TCP].sport
5016 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5017 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5018 self.check_udp_checksum(packet)
5019 client_udp_out_port = packet[UDP].sport
5021 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5026 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5027 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5028 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5030 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5031 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5032 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5034 self.pg0.add_stream(pkts)
5035 self.pg_enable_capture(self.pg_interfaces)
5037 capture = self.pg0.get_capture(len(pkts))
5038 for packet in capture:
5040 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5041 self.assertEqual(packet[IPv6].dst, client.ip6)
5042 if packet.haslayer(TCP):
5043 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5044 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5045 self.check_tcp_checksum(packet)
5047 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5048 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5049 self.check_udp_checksum(packet)
5051 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5056 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5057 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5058 ICMPv6DestUnreach(code=1) /
5059 packet[IPv6] for packet in capture]
5060 self.pg0.add_stream(pkts)
5061 self.pg_enable_capture(self.pg_interfaces)
5063 capture = self.pg0.get_capture(len(pkts))
5064 for packet in capture:
5066 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5067 self.assertEqual(packet[IPv6].dst, server.ip6)
5068 icmp = packet[ICMPv6DestUnreach]
5069 self.assertEqual(icmp.code, 1)
5070 inner = icmp[IPerror6]
5071 self.assertEqual(inner.src, server.ip6)
5072 self.assertEqual(inner.dst, nat_addr_ip6)
5073 self.check_icmpv6_checksum(packet)
5074 if inner.haslayer(TCPerror):
5075 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5076 self.assertEqual(inner[TCPerror].dport,
5077 client_tcp_out_port)
5079 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5080 self.assertEqual(inner[UDPerror].dport,
5081 client_udp_out_port)
5083 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5086 def test_prefix(self):
5087 """ NAT64 Network-Specific Prefix """
5089 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5091 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5092 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5093 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5094 self.vrf1_nat_addr_n,
5095 vrf_id=self.vrf1_id)
5096 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5099 global_pref64 = "2001:db8::"
5100 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5101 global_pref64_len = 32
5102 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5104 prefix = self.vapi.nat64_prefix_dump()
5105 self.assertEqual(len(prefix), 1)
5106 self.assertEqual(prefix[0].prefix, global_pref64_n)
5107 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5108 self.assertEqual(prefix[0].vrf_id, 0)
5110 # Add tenant specific prefix
5111 vrf1_pref64 = "2001:db8:122:300::"
5112 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5113 vrf1_pref64_len = 56
5114 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5116 vrf_id=self.vrf1_id)
5117 prefix = self.vapi.nat64_prefix_dump()
5118 self.assertEqual(len(prefix), 2)
5121 pkts = self.create_stream_in_ip6(self.pg0,
5124 plen=global_pref64_len)
5125 self.pg0.add_stream(pkts)
5126 self.pg_enable_capture(self.pg_interfaces)
5128 capture = self.pg1.get_capture(len(pkts))
5129 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5130 dst_ip=self.pg1.remote_ip4)
5132 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5133 self.pg1.add_stream(pkts)
5134 self.pg_enable_capture(self.pg_interfaces)
5136 capture = self.pg0.get_capture(len(pkts))
5137 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5140 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5142 # Tenant specific prefix
5143 pkts = self.create_stream_in_ip6(self.pg2,
5146 plen=vrf1_pref64_len)
5147 self.pg2.add_stream(pkts)
5148 self.pg_enable_capture(self.pg_interfaces)
5150 capture = self.pg1.get_capture(len(pkts))
5151 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5152 dst_ip=self.pg1.remote_ip4)
5154 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5155 self.pg1.add_stream(pkts)
5156 self.pg_enable_capture(self.pg_interfaces)
5158 capture = self.pg2.get_capture(len(pkts))
5159 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5162 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5164 def test_unknown_proto(self):
5165 """ NAT64 translate packet with unknown protocol """
5167 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5169 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5170 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5171 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5174 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5175 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5176 TCP(sport=self.tcp_port_in, dport=20))
5177 self.pg0.add_stream(p)
5178 self.pg_enable_capture(self.pg_interfaces)
5180 p = self.pg1.get_capture(1)
5182 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5183 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5185 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5186 TCP(sport=1234, dport=1234))
5187 self.pg0.add_stream(p)
5188 self.pg_enable_capture(self.pg_interfaces)
5190 p = self.pg1.get_capture(1)
5193 self.assertEqual(packet[IP].src, self.nat_addr)
5194 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5195 self.assertTrue(packet.haslayer(GRE))
5196 self.check_ip_checksum(packet)
5198 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5202 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5203 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5205 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5206 TCP(sport=1234, dport=1234))
5207 self.pg1.add_stream(p)
5208 self.pg_enable_capture(self.pg_interfaces)
5210 p = self.pg0.get_capture(1)
5213 self.assertEqual(packet[IPv6].src, remote_ip6)
5214 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5215 self.assertEqual(packet[IPv6].nh, 47)
5217 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5220 def test_hairpinning_unknown_proto(self):
5221 """ NAT64 translate packet with unknown protocol - hairpinning """
5223 client = self.pg0.remote_hosts[0]
5224 server = self.pg0.remote_hosts[1]
5225 server_tcp_in_port = 22
5226 server_tcp_out_port = 4022
5227 client_tcp_in_port = 1234
5228 client_tcp_out_port = 1235
5229 server_nat_ip = "10.0.0.100"
5230 client_nat_ip = "10.0.0.110"
5231 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5232 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5233 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5234 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5236 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5238 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5239 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5241 self.vapi.nat64_add_del_static_bib(server.ip6n,
5244 server_tcp_out_port,
5247 self.vapi.nat64_add_del_static_bib(server.ip6n,
5253 self.vapi.nat64_add_del_static_bib(client.ip6n,
5256 client_tcp_out_port,
5260 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5261 IPv6(src=client.ip6, dst=server_nat_ip6) /
5262 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5263 self.pg0.add_stream(p)
5264 self.pg_enable_capture(self.pg_interfaces)
5266 p = self.pg0.get_capture(1)
5268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5269 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5271 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5272 TCP(sport=1234, dport=1234))
5273 self.pg0.add_stream(p)
5274 self.pg_enable_capture(self.pg_interfaces)
5276 p = self.pg0.get_capture(1)
5279 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5280 self.assertEqual(packet[IPv6].dst, server.ip6)
5281 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5283 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5287 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5288 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5290 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5291 TCP(sport=1234, dport=1234))
5292 self.pg0.add_stream(p)
5293 self.pg_enable_capture(self.pg_interfaces)
5295 p = self.pg0.get_capture(1)
5298 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5299 self.assertEqual(packet[IPv6].dst, client.ip6)
5300 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5302 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5305 def test_one_armed_nat64(self):
5306 """ One armed NAT64 """
5308 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5312 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5314 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5315 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5318 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5319 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5320 TCP(sport=12345, dport=80))
5321 self.pg3.add_stream(p)
5322 self.pg_enable_capture(self.pg_interfaces)
5324 capture = self.pg3.get_capture(1)
5329 self.assertEqual(ip.src, self.nat_addr)
5330 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5331 self.assertNotEqual(tcp.sport, 12345)
5332 external_port = tcp.sport
5333 self.assertEqual(tcp.dport, 80)
5334 self.check_tcp_checksum(p)
5335 self.check_ip_checksum(p)
5337 self.logger.error(ppp("Unexpected or invalid packet:", p))
5341 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5342 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5343 TCP(sport=80, dport=external_port))
5344 self.pg3.add_stream(p)
5345 self.pg_enable_capture(self.pg_interfaces)
5347 capture = self.pg3.get_capture(1)
5352 self.assertEqual(ip.src, remote_host_ip6)
5353 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5354 self.assertEqual(tcp.sport, 80)
5355 self.assertEqual(tcp.dport, 12345)
5356 self.check_tcp_checksum(p)
5358 self.logger.error(ppp("Unexpected or invalid packet:", p))
5361 def test_frag_in_order(self):
5362 """ NAT64 translate fragments arriving in order """
5363 self.tcp_port_in = random.randint(1025, 65535)
5365 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5367 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5368 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5370 reass = self.vapi.nat_reass_dump()
5371 reass_n_start = len(reass)
5375 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5376 self.tcp_port_in, 20, data)
5377 self.pg0.add_stream(pkts)
5378 self.pg_enable_capture(self.pg_interfaces)
5380 frags = self.pg1.get_capture(len(pkts))
5381 p = self.reass_frags_and_verify(frags,
5383 self.pg1.remote_ip4)
5384 self.assertEqual(p[TCP].dport, 20)
5385 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5386 self.tcp_port_out = p[TCP].sport
5387 self.assertEqual(data, p[Raw].load)
5390 data = "A" * 4 + "b" * 16 + "C" * 3
5391 pkts = self.create_stream_frag(self.pg1,
5396 self.pg1.add_stream(pkts)
5397 self.pg_enable_capture(self.pg_interfaces)
5399 frags = self.pg0.get_capture(len(pkts))
5400 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5401 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5402 self.assertEqual(p[TCP].sport, 20)
5403 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5404 self.assertEqual(data, p[Raw].load)
5406 reass = self.vapi.nat_reass_dump()
5407 reass_n_end = len(reass)
5409 self.assertEqual(reass_n_end - reass_n_start, 2)
5411 def test_reass_hairpinning(self):
5412 """ NAT64 fragments hairpinning """
5414 client = self.pg0.remote_hosts[0]
5415 server = self.pg0.remote_hosts[1]
5416 server_in_port = random.randint(1025, 65535)
5417 server_out_port = random.randint(1025, 65535)
5418 client_in_port = random.randint(1025, 65535)
5419 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5420 nat_addr_ip6 = ip.src
5422 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5424 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5425 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5427 # add static BIB entry for server
5428 self.vapi.nat64_add_del_static_bib(server.ip6n,
5434 # send packet from host to server
5435 pkts = self.create_stream_frag_ip6(self.pg0,
5440 self.pg0.add_stream(pkts)
5441 self.pg_enable_capture(self.pg_interfaces)
5443 frags = self.pg0.get_capture(len(pkts))
5444 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5445 self.assertNotEqual(p[TCP].sport, client_in_port)
5446 self.assertEqual(p[TCP].dport, server_in_port)
5447 self.assertEqual(data, p[Raw].load)
5449 def test_frag_out_of_order(self):
5450 """ NAT64 translate fragments arriving out of order """
5451 self.tcp_port_in = random.randint(1025, 65535)
5453 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5455 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5456 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5460 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5461 self.tcp_port_in, 20, data)
5463 self.pg0.add_stream(pkts)
5464 self.pg_enable_capture(self.pg_interfaces)
5466 frags = self.pg1.get_capture(len(pkts))
5467 p = self.reass_frags_and_verify(frags,
5469 self.pg1.remote_ip4)
5470 self.assertEqual(p[TCP].dport, 20)
5471 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5472 self.tcp_port_out = p[TCP].sport
5473 self.assertEqual(data, p[Raw].load)
5476 data = "A" * 4 + "B" * 16 + "C" * 3
5477 pkts = self.create_stream_frag(self.pg1,
5483 self.pg1.add_stream(pkts)
5484 self.pg_enable_capture(self.pg_interfaces)
5486 frags = self.pg0.get_capture(len(pkts))
5487 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5488 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5489 self.assertEqual(p[TCP].sport, 20)
5490 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5491 self.assertEqual(data, p[Raw].load)
5493 def test_interface_addr(self):
5494 """ Acquire NAT64 pool addresses from interface """
5495 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5497 # no address in NAT64 pool
5498 adresses = self.vapi.nat44_address_dump()
5499 self.assertEqual(0, len(adresses))
5501 # configure interface address and check NAT64 address pool
5502 self.pg4.config_ip4()
5503 addresses = self.vapi.nat64_pool_addr_dump()
5504 self.assertEqual(len(addresses), 1)
5505 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5507 # remove interface address and check NAT64 address pool
5508 self.pg4.unconfig_ip4()
5509 addresses = self.vapi.nat64_pool_addr_dump()
5510 self.assertEqual(0, len(adresses))
5512 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5513 def test_ipfix_max_bibs_sessions(self):
5514 """ IPFIX logging maximum session and BIB entries exceeded """
5517 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5521 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5523 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5524 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5528 for i in range(0, max_bibs):
5529 src = "fd01:aa::%x" % (i)
5530 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5531 IPv6(src=src, dst=remote_host_ip6) /
5532 TCP(sport=12345, dport=80))
5534 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5535 IPv6(src=src, dst=remote_host_ip6) /
5536 TCP(sport=12345, dport=22))
5538 self.pg0.add_stream(pkts)
5539 self.pg_enable_capture(self.pg_interfaces)
5541 self.pg1.get_capture(max_sessions)
5543 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5544 src_address=self.pg3.local_ip4n,
5546 template_interval=10)
5547 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5548 src_port=self.ipfix_src_port)
5550 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5551 IPv6(src=src, dst=remote_host_ip6) /
5552 TCP(sport=12345, dport=25))
5553 self.pg0.add_stream(p)
5554 self.pg_enable_capture(self.pg_interfaces)
5556 self.pg1.get_capture(0)
5557 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5558 capture = self.pg3.get_capture(9)
5559 ipfix = IPFIXDecoder()
5560 # first load template
5562 self.assertTrue(p.haslayer(IPFIX))
5563 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5564 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5565 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5566 self.assertEqual(p[UDP].dport, 4739)
5567 self.assertEqual(p[IPFIX].observationDomainID,
5568 self.ipfix_domain_id)
5569 if p.haslayer(Template):
5570 ipfix.add_template(p.getlayer(Template))
5571 # verify events in data set
5573 if p.haslayer(Data):
5574 data = ipfix.decode_data_set(p.getlayer(Set))
5575 self.verify_ipfix_max_sessions(data, max_sessions)
5577 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5578 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5579 TCP(sport=12345, dport=80))
5580 self.pg0.add_stream(p)
5581 self.pg_enable_capture(self.pg_interfaces)
5583 self.pg1.get_capture(0)
5584 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5585 capture = self.pg3.get_capture(1)
5586 # verify events in data set
5588 self.assertTrue(p.haslayer(IPFIX))
5589 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5590 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5591 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5592 self.assertEqual(p[UDP].dport, 4739)
5593 self.assertEqual(p[IPFIX].observationDomainID,
5594 self.ipfix_domain_id)
5595 if p.haslayer(Data):
5596 data = ipfix.decode_data_set(p.getlayer(Set))
5597 self.verify_ipfix_max_bibs(data, max_bibs)
5599 def test_ipfix_max_frags(self):
5600 """ IPFIX logging maximum fragments pending reassembly exceeded """
5601 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5603 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5604 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5605 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5606 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5607 src_address=self.pg3.local_ip4n,
5609 template_interval=10)
5610 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5611 src_port=self.ipfix_src_port)
5614 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5615 self.tcp_port_in, 20, data)
5616 self.pg0.add_stream(pkts[-1])
5617 self.pg_enable_capture(self.pg_interfaces)
5619 self.pg1.get_capture(0)
5620 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5621 capture = self.pg3.get_capture(9)
5622 ipfix = IPFIXDecoder()
5623 # first load template
5625 self.assertTrue(p.haslayer(IPFIX))
5626 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5627 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5628 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5629 self.assertEqual(p[UDP].dport, 4739)
5630 self.assertEqual(p[IPFIX].observationDomainID,
5631 self.ipfix_domain_id)
5632 if p.haslayer(Template):
5633 ipfix.add_template(p.getlayer(Template))
5634 # verify events in data set
5636 if p.haslayer(Data):
5637 data = ipfix.decode_data_set(p.getlayer(Set))
5638 self.verify_ipfix_max_fragments_ip6(data, 0,
5639 self.pg0.remote_ip6n)
5641 def test_ipfix_bib_ses(self):
5642 """ IPFIX logging NAT64 BIB/session create and delete events """
5643 self.tcp_port_in = random.randint(1025, 65535)
5644 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5648 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5650 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5651 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5652 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5653 src_address=self.pg3.local_ip4n,
5655 template_interval=10)
5656 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5657 src_port=self.ipfix_src_port)
5660 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5661 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5662 TCP(sport=self.tcp_port_in, dport=25))
5663 self.pg0.add_stream(p)
5664 self.pg_enable_capture(self.pg_interfaces)
5666 p = self.pg1.get_capture(1)
5667 self.tcp_port_out = p[0][TCP].sport
5668 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5669 capture = self.pg3.get_capture(10)
5670 ipfix = IPFIXDecoder()
5671 # first load template
5673 self.assertTrue(p.haslayer(IPFIX))
5674 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5675 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5676 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5677 self.assertEqual(p[UDP].dport, 4739)
5678 self.assertEqual(p[IPFIX].observationDomainID,
5679 self.ipfix_domain_id)
5680 if p.haslayer(Template):
5681 ipfix.add_template(p.getlayer(Template))
5682 # verify events in data set
5684 if p.haslayer(Data):
5685 data = ipfix.decode_data_set(p.getlayer(Set))
5686 if ord(data[0][230]) == 10:
5687 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5688 elif ord(data[0][230]) == 6:
5689 self.verify_ipfix_nat64_ses(data,
5691 self.pg0.remote_ip6n,
5692 self.pg1.remote_ip4,
5695 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5698 self.pg_enable_capture(self.pg_interfaces)
5699 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5702 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5703 capture = self.pg3.get_capture(2)
5704 # verify events in data set
5706 self.assertTrue(p.haslayer(IPFIX))
5707 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5708 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5709 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5710 self.assertEqual(p[UDP].dport, 4739)
5711 self.assertEqual(p[IPFIX].observationDomainID,
5712 self.ipfix_domain_id)
5713 if p.haslayer(Data):
5714 data = ipfix.decode_data_set(p.getlayer(Set))
5715 if ord(data[0][230]) == 11:
5716 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5717 elif ord(data[0][230]) == 7:
5718 self.verify_ipfix_nat64_ses(data,
5720 self.pg0.remote_ip6n,
5721 self.pg1.remote_ip4,
5724 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5726 def nat64_get_ses_num(self):
5728 Return number of active NAT64 sessions.
5730 st = self.vapi.nat64_st_dump()
5733 def clear_nat64(self):
5735 Clear NAT64 configuration.
5737 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5738 domain_id=self.ipfix_domain_id)
5739 self.ipfix_src_port = 4739
5740 self.ipfix_domain_id = 1
5742 self.vapi.nat64_set_timeouts()
5744 interfaces = self.vapi.nat64_interface_dump()
5745 for intf in interfaces:
5746 if intf.is_inside > 1:
5747 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5750 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5754 bib = self.vapi.nat64_bib_dump(255)
5757 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5765 adresses = self.vapi.nat64_pool_addr_dump()
5766 for addr in adresses:
5767 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5772 prefixes = self.vapi.nat64_prefix_dump()
5773 for prefix in prefixes:
5774 self.vapi.nat64_add_del_prefix(prefix.prefix,
5776 vrf_id=prefix.vrf_id,
5780 super(TestNAT64, self).tearDown()
5781 if not self.vpp_dead:
5782 self.logger.info(self.vapi.cli("show nat64 pool"))
5783 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5784 self.logger.info(self.vapi.cli("show nat64 prefix"))
5785 self.logger.info(self.vapi.cli("show nat64 bib all"))
5786 self.logger.info(self.vapi.cli("show nat64 session table all"))
5787 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5791 class TestDSlite(MethodHolder):
5792 """ DS-Lite Test Cases """
5795 def setUpClass(cls):
5796 super(TestDSlite, cls).setUpClass()
5799 cls.nat_addr = '10.0.0.3'
5800 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5802 cls.create_pg_interfaces(range(2))
5804 cls.pg0.config_ip4()
5805 cls.pg0.resolve_arp()
5807 cls.pg1.config_ip6()
5808 cls.pg1.generate_remote_hosts(2)
5809 cls.pg1.configure_ipv6_neighbors()
5812 super(TestDSlite, cls).tearDownClass()
5815 def test_dslite(self):
5816 """ Test DS-Lite """
5817 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5819 aftr_ip4 = '192.0.0.1'
5820 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5821 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5822 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5823 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5826 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5827 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5828 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5829 UDP(sport=20000, dport=10000))
5830 self.pg1.add_stream(p)
5831 self.pg_enable_capture(self.pg_interfaces)
5833 capture = self.pg0.get_capture(1)
5834 capture = capture[0]
5835 self.assertFalse(capture.haslayer(IPv6))
5836 self.assertEqual(capture[IP].src, self.nat_addr)
5837 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5838 self.assertNotEqual(capture[UDP].sport, 20000)
5839 self.assertEqual(capture[UDP].dport, 10000)
5840 self.check_ip_checksum(capture)
5841 out_port = capture[UDP].sport
5843 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5844 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5845 UDP(sport=10000, dport=out_port))
5846 self.pg0.add_stream(p)
5847 self.pg_enable_capture(self.pg_interfaces)
5849 capture = self.pg1.get_capture(1)
5850 capture = capture[0]
5851 self.assertEqual(capture[IPv6].src, aftr_ip6)
5852 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5853 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5854 self.assertEqual(capture[IP].dst, '192.168.1.1')
5855 self.assertEqual(capture[UDP].sport, 10000)
5856 self.assertEqual(capture[UDP].dport, 20000)
5857 self.check_ip_checksum(capture)
5860 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5861 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5862 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5863 TCP(sport=20001, dport=10001))
5864 self.pg1.add_stream(p)
5865 self.pg_enable_capture(self.pg_interfaces)
5867 capture = self.pg0.get_capture(1)
5868 capture = capture[0]
5869 self.assertFalse(capture.haslayer(IPv6))
5870 self.assertEqual(capture[IP].src, self.nat_addr)
5871 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5872 self.assertNotEqual(capture[TCP].sport, 20001)
5873 self.assertEqual(capture[TCP].dport, 10001)
5874 self.check_ip_checksum(capture)
5875 self.check_tcp_checksum(capture)
5876 out_port = capture[TCP].sport
5878 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5879 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5880 TCP(sport=10001, dport=out_port))
5881 self.pg0.add_stream(p)
5882 self.pg_enable_capture(self.pg_interfaces)
5884 capture = self.pg1.get_capture(1)
5885 capture = capture[0]
5886 self.assertEqual(capture[IPv6].src, aftr_ip6)
5887 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5888 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5889 self.assertEqual(capture[IP].dst, '192.168.1.1')
5890 self.assertEqual(capture[TCP].sport, 10001)
5891 self.assertEqual(capture[TCP].dport, 20001)
5892 self.check_ip_checksum(capture)
5893 self.check_tcp_checksum(capture)
5896 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5897 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5898 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5899 ICMP(id=4000, type='echo-request'))
5900 self.pg1.add_stream(p)
5901 self.pg_enable_capture(self.pg_interfaces)
5903 capture = self.pg0.get_capture(1)
5904 capture = capture[0]
5905 self.assertFalse(capture.haslayer(IPv6))
5906 self.assertEqual(capture[IP].src, self.nat_addr)
5907 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5908 self.assertNotEqual(capture[ICMP].id, 4000)
5909 self.check_ip_checksum(capture)
5910 self.check_icmp_checksum(capture)
5911 out_id = capture[ICMP].id
5913 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5914 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5915 ICMP(id=out_id, type='echo-reply'))
5916 self.pg0.add_stream(p)
5917 self.pg_enable_capture(self.pg_interfaces)
5919 capture = self.pg1.get_capture(1)
5920 capture = capture[0]
5921 self.assertEqual(capture[IPv6].src, aftr_ip6)
5922 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5923 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5924 self.assertEqual(capture[IP].dst, '192.168.1.1')
5925 self.assertEqual(capture[ICMP].id, 4000)
5926 self.check_ip_checksum(capture)
5927 self.check_icmp_checksum(capture)
5929 # ping DS-Lite AFTR tunnel endpoint address
5930 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5931 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5932 ICMPv6EchoRequest())
5933 self.pg1.add_stream(p)
5934 self.pg_enable_capture(self.pg_interfaces)
5936 capture = self.pg1.get_capture(1)
5937 self.assertEqual(1, len(capture))
5938 capture = capture[0]
5939 self.assertEqual(capture[IPv6].src, aftr_ip6)
5940 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5941 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5944 super(TestDSlite, self).tearDown()
5945 if not self.vpp_dead:
5946 self.logger.info(self.vapi.cli("show dslite pool"))
5948 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5949 self.logger.info(self.vapi.cli("show dslite sessions"))
5952 class TestDSliteCE(MethodHolder):
5953 """ DS-Lite CE Test Cases """
5956 def setUpConstants(cls):
5957 super(TestDSliteCE, cls).setUpConstants()
5958 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5961 def setUpClass(cls):
5962 super(TestDSliteCE, cls).setUpClass()
5965 cls.create_pg_interfaces(range(2))
5967 cls.pg0.config_ip4()
5968 cls.pg0.resolve_arp()
5970 cls.pg1.config_ip6()
5971 cls.pg1.generate_remote_hosts(1)
5972 cls.pg1.configure_ipv6_neighbors()
5975 super(TestDSliteCE, cls).tearDownClass()
5978 def test_dslite_ce(self):
5979 """ Test DS-Lite CE """
5981 b4_ip4 = '192.0.0.2'
5982 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5983 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5984 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5985 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5987 aftr_ip4 = '192.0.0.1'
5988 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5989 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5990 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5991 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5993 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5994 dst_address_length=128,
5995 next_hop_address=self.pg1.remote_ip6n,
5996 next_hop_sw_if_index=self.pg1.sw_if_index,
6000 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6001 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6002 UDP(sport=10000, dport=20000))
6003 self.pg0.add_stream(p)
6004 self.pg_enable_capture(self.pg_interfaces)
6006 capture = self.pg1.get_capture(1)
6007 capture = capture[0]
6008 self.assertEqual(capture[IPv6].src, b4_ip6)
6009 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6010 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6011 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6012 self.assertEqual(capture[UDP].sport, 10000)
6013 self.assertEqual(capture[UDP].dport, 20000)
6014 self.check_ip_checksum(capture)
6017 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6018 IPv6(dst=b4_ip6, src=aftr_ip6) /
6019 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6020 UDP(sport=20000, dport=10000))
6021 self.pg1.add_stream(p)
6022 self.pg_enable_capture(self.pg_interfaces)
6024 capture = self.pg0.get_capture(1)
6025 capture = capture[0]
6026 self.assertFalse(capture.haslayer(IPv6))
6027 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6028 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6029 self.assertEqual(capture[UDP].sport, 20000)
6030 self.assertEqual(capture[UDP].dport, 10000)
6031 self.check_ip_checksum(capture)
6033 # ping DS-Lite B4 tunnel endpoint address
6034 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6035 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6036 ICMPv6EchoRequest())
6037 self.pg1.add_stream(p)
6038 self.pg_enable_capture(self.pg_interfaces)
6040 capture = self.pg1.get_capture(1)
6041 self.assertEqual(1, len(capture))
6042 capture = capture[0]
6043 self.assertEqual(capture[IPv6].src, b4_ip6)
6044 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6045 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6048 super(TestDSliteCE, self).tearDown()
6049 if not self.vpp_dead:
6051 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6053 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6055 if __name__ == '__main__':
6056 unittest.main(testRunner=VppTestRunner)