9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_identity_nat(self):
1680 """ Identity NAT """
1682 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1683 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1684 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1687 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1688 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1689 TCP(sport=12345, dport=56789))
1690 self.pg1.add_stream(p)
1691 self.pg_enable_capture(self.pg_interfaces)
1693 capture = self.pg0.get_capture(1)
1698 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1699 self.assertEqual(ip.src, self.pg1.remote_ip4)
1700 self.assertEqual(tcp.dport, 56789)
1701 self.assertEqual(tcp.sport, 12345)
1702 self.check_tcp_checksum(p)
1703 self.check_ip_checksum(p)
1705 self.logger.error(ppp("Unexpected or invalid packet:", p))
1708 def test_static_lb(self):
1709 """ NAT44 local service load balancing """
1710 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1713 server1 = self.pg0.remote_hosts[0]
1714 server2 = self.pg0.remote_hosts[1]
1716 locals = [{'addr': server1.ip4n,
1719 {'addr': server2.ip4n,
1723 self.nat44_add_address(self.nat_addr)
1724 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1727 local_num=len(locals),
1729 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1730 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1733 # from client to service
1734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1736 TCP(sport=12345, dport=external_port))
1737 self.pg1.add_stream(p)
1738 self.pg_enable_capture(self.pg_interfaces)
1740 capture = self.pg0.get_capture(1)
1746 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1747 if ip.dst == server1.ip4:
1751 self.assertEqual(tcp.dport, local_port)
1752 self.check_tcp_checksum(p)
1753 self.check_ip_checksum(p)
1755 self.logger.error(ppp("Unexpected or invalid packet:", p))
1758 # from service back to client
1759 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1760 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1761 TCP(sport=local_port, dport=12345))
1762 self.pg0.add_stream(p)
1763 self.pg_enable_capture(self.pg_interfaces)
1765 capture = self.pg1.get_capture(1)
1770 self.assertEqual(ip.src, self.nat_addr)
1771 self.assertEqual(tcp.sport, external_port)
1772 self.check_tcp_checksum(p)
1773 self.check_ip_checksum(p)
1775 self.logger.error(ppp("Unexpected or invalid packet:", p))
1781 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1783 for client in clients:
1784 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1785 IP(src=client, dst=self.nat_addr) /
1786 TCP(sport=12345, dport=external_port))
1788 self.pg1.add_stream(pkts)
1789 self.pg_enable_capture(self.pg_interfaces)
1791 capture = self.pg0.get_capture(len(pkts))
1793 if p[IP].dst == server1.ip4:
1797 self.assertTrue(server1_n > server2_n)
1799 def test_static_lb_2(self):
1800 """ NAT44 local service load balancing (asymmetrical rule) """
1801 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1804 server1 = self.pg0.remote_hosts[0]
1805 server2 = self.pg0.remote_hosts[1]
1807 locals = [{'addr': server1.ip4n,
1810 {'addr': server2.ip4n,
1814 self.vapi.nat44_forwarding_enable_disable(1)
1815 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1819 local_num=len(locals),
1821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1822 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1825 # from client to service
1826 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1827 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1828 TCP(sport=12345, dport=external_port))
1829 self.pg1.add_stream(p)
1830 self.pg_enable_capture(self.pg_interfaces)
1832 capture = self.pg0.get_capture(1)
1838 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1839 if ip.dst == server1.ip4:
1843 self.assertEqual(tcp.dport, local_port)
1844 self.check_tcp_checksum(p)
1845 self.check_ip_checksum(p)
1847 self.logger.error(ppp("Unexpected or invalid packet:", p))
1850 # from service back to client
1851 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1852 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1853 TCP(sport=local_port, dport=12345))
1854 self.pg0.add_stream(p)
1855 self.pg_enable_capture(self.pg_interfaces)
1857 capture = self.pg1.get_capture(1)
1862 self.assertEqual(ip.src, self.nat_addr)
1863 self.assertEqual(tcp.sport, external_port)
1864 self.check_tcp_checksum(p)
1865 self.check_ip_checksum(p)
1867 self.logger.error(ppp("Unexpected or invalid packet:", p))
1870 # from client to server (no translation)
1871 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1872 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1873 TCP(sport=12346, dport=local_port))
1874 self.pg1.add_stream(p)
1875 self.pg_enable_capture(self.pg_interfaces)
1877 capture = self.pg0.get_capture(1)
1883 self.assertEqual(ip.dst, server1.ip4)
1884 self.assertEqual(tcp.dport, local_port)
1885 self.check_tcp_checksum(p)
1886 self.check_ip_checksum(p)
1888 self.logger.error(ppp("Unexpected or invalid packet:", p))
1891 # from service back to client (no translation)
1892 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1893 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1894 TCP(sport=local_port, dport=12346))
1895 self.pg0.add_stream(p)
1896 self.pg_enable_capture(self.pg_interfaces)
1898 capture = self.pg1.get_capture(1)
1903 self.assertEqual(ip.src, server1.ip4)
1904 self.assertEqual(tcp.sport, local_port)
1905 self.check_tcp_checksum(p)
1906 self.check_ip_checksum(p)
1908 self.logger.error(ppp("Unexpected or invalid packet:", p))
1911 def test_multiple_inside_interfaces(self):
1912 """ NAT44 multiple non-overlapping address space inside interfaces """
1914 self.nat44_add_address(self.nat_addr)
1915 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1916 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1917 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1920 # between two NAT44 inside interfaces (no translation)
1921 pkts = self.create_stream_in(self.pg0, self.pg1)
1922 self.pg0.add_stream(pkts)
1923 self.pg_enable_capture(self.pg_interfaces)
1925 capture = self.pg1.get_capture(len(pkts))
1926 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1928 # from NAT44 inside to interface without NAT44 feature (no translation)
1929 pkts = self.create_stream_in(self.pg0, self.pg2)
1930 self.pg0.add_stream(pkts)
1931 self.pg_enable_capture(self.pg_interfaces)
1933 capture = self.pg2.get_capture(len(pkts))
1934 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1936 # in2out 1st interface
1937 pkts = self.create_stream_in(self.pg0, self.pg3)
1938 self.pg0.add_stream(pkts)
1939 self.pg_enable_capture(self.pg_interfaces)
1941 capture = self.pg3.get_capture(len(pkts))
1942 self.verify_capture_out(capture)
1944 # out2in 1st interface
1945 pkts = self.create_stream_out(self.pg3)
1946 self.pg3.add_stream(pkts)
1947 self.pg_enable_capture(self.pg_interfaces)
1949 capture = self.pg0.get_capture(len(pkts))
1950 self.verify_capture_in(capture, self.pg0)
1952 # in2out 2nd interface
1953 pkts = self.create_stream_in(self.pg1, self.pg3)
1954 self.pg1.add_stream(pkts)
1955 self.pg_enable_capture(self.pg_interfaces)
1957 capture = self.pg3.get_capture(len(pkts))
1958 self.verify_capture_out(capture)
1960 # out2in 2nd interface
1961 pkts = self.create_stream_out(self.pg3)
1962 self.pg3.add_stream(pkts)
1963 self.pg_enable_capture(self.pg_interfaces)
1965 capture = self.pg1.get_capture(len(pkts))
1966 self.verify_capture_in(capture, self.pg1)
1968 def test_inside_overlapping_interfaces(self):
1969 """ NAT44 multiple inside interfaces with overlapping address space """
1971 static_nat_ip = "10.0.0.10"
1972 self.nat44_add_address(self.nat_addr)
1973 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1975 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1976 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1977 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1978 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1981 # between NAT44 inside interfaces with same VRF (no translation)
1982 pkts = self.create_stream_in(self.pg4, self.pg5)
1983 self.pg4.add_stream(pkts)
1984 self.pg_enable_capture(self.pg_interfaces)
1986 capture = self.pg5.get_capture(len(pkts))
1987 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1989 # between NAT44 inside interfaces with different VRF (hairpinning)
1990 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1991 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1992 TCP(sport=1234, dport=5678))
1993 self.pg4.add_stream(p)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg6.get_capture(1)
2001 self.assertEqual(ip.src, self.nat_addr)
2002 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2003 self.assertNotEqual(tcp.sport, 1234)
2004 self.assertEqual(tcp.dport, 5678)
2006 self.logger.error(ppp("Unexpected or invalid packet:", p))
2009 # in2out 1st interface
2010 pkts = self.create_stream_in(self.pg4, self.pg3)
2011 self.pg4.add_stream(pkts)
2012 self.pg_enable_capture(self.pg_interfaces)
2014 capture = self.pg3.get_capture(len(pkts))
2015 self.verify_capture_out(capture)
2017 # out2in 1st interface
2018 pkts = self.create_stream_out(self.pg3)
2019 self.pg3.add_stream(pkts)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 capture = self.pg4.get_capture(len(pkts))
2023 self.verify_capture_in(capture, self.pg4)
2025 # in2out 2nd interface
2026 pkts = self.create_stream_in(self.pg5, self.pg3)
2027 self.pg5.add_stream(pkts)
2028 self.pg_enable_capture(self.pg_interfaces)
2030 capture = self.pg3.get_capture(len(pkts))
2031 self.verify_capture_out(capture)
2033 # out2in 2nd interface
2034 pkts = self.create_stream_out(self.pg3)
2035 self.pg3.add_stream(pkts)
2036 self.pg_enable_capture(self.pg_interfaces)
2038 capture = self.pg5.get_capture(len(pkts))
2039 self.verify_capture_in(capture, self.pg5)
2042 addresses = self.vapi.nat44_address_dump()
2043 self.assertEqual(len(addresses), 1)
2044 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2045 self.assertEqual(len(sessions), 3)
2046 for session in sessions:
2047 self.assertFalse(session.is_static)
2048 self.assertEqual(session.inside_ip_address[0:4],
2049 self.pg5.remote_ip4n)
2050 self.assertEqual(session.outside_ip_address,
2051 addresses[0].ip_address)
2052 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2053 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2054 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2055 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2056 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2057 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2058 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2059 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2060 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2062 # in2out 3rd interface
2063 pkts = self.create_stream_in(self.pg6, self.pg3)
2064 self.pg6.add_stream(pkts)
2065 self.pg_enable_capture(self.pg_interfaces)
2067 capture = self.pg3.get_capture(len(pkts))
2068 self.verify_capture_out(capture, static_nat_ip, True)
2070 # out2in 3rd interface
2071 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2072 self.pg3.add_stream(pkts)
2073 self.pg_enable_capture(self.pg_interfaces)
2075 capture = self.pg6.get_capture(len(pkts))
2076 self.verify_capture_in(capture, self.pg6)
2078 # general user and session dump verifications
2079 users = self.vapi.nat44_user_dump()
2080 self.assertTrue(len(users) >= 3)
2081 addresses = self.vapi.nat44_address_dump()
2082 self.assertEqual(len(addresses), 1)
2084 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2086 for session in sessions:
2087 self.assertEqual(user.ip_address, session.inside_ip_address)
2088 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2089 self.assertTrue(session.protocol in
2090 [IP_PROTOS.tcp, IP_PROTOS.udp,
2094 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2095 self.assertTrue(len(sessions) >= 4)
2096 for session in sessions:
2097 self.assertFalse(session.is_static)
2098 self.assertEqual(session.inside_ip_address[0:4],
2099 self.pg4.remote_ip4n)
2100 self.assertEqual(session.outside_ip_address,
2101 addresses[0].ip_address)
2104 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2105 self.assertTrue(len(sessions) >= 3)
2106 for session in sessions:
2107 self.assertTrue(session.is_static)
2108 self.assertEqual(session.inside_ip_address[0:4],
2109 self.pg6.remote_ip4n)
2110 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2111 map(int, static_nat_ip.split('.')))
2112 self.assertTrue(session.inside_port in
2113 [self.tcp_port_in, self.udp_port_in,
2116 def test_hairpinning(self):
2117 """ NAT44 hairpinning - 1:1 NAPT """
2119 host = self.pg0.remote_hosts[0]
2120 server = self.pg0.remote_hosts[1]
2123 server_in_port = 5678
2124 server_out_port = 8765
2126 self.nat44_add_address(self.nat_addr)
2127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2130 # add static mapping for server
2131 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2132 server_in_port, server_out_port,
2133 proto=IP_PROTOS.tcp)
2135 # send packet from host to server
2136 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2137 IP(src=host.ip4, dst=self.nat_addr) /
2138 TCP(sport=host_in_port, dport=server_out_port))
2139 self.pg0.add_stream(p)
2140 self.pg_enable_capture(self.pg_interfaces)
2142 capture = self.pg0.get_capture(1)
2147 self.assertEqual(ip.src, self.nat_addr)
2148 self.assertEqual(ip.dst, server.ip4)
2149 self.assertNotEqual(tcp.sport, host_in_port)
2150 self.assertEqual(tcp.dport, server_in_port)
2151 self.check_tcp_checksum(p)
2152 host_out_port = tcp.sport
2154 self.logger.error(ppp("Unexpected or invalid packet:", p))
2157 # send reply from server to host
2158 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2159 IP(src=server.ip4, dst=self.nat_addr) /
2160 TCP(sport=server_in_port, dport=host_out_port))
2161 self.pg0.add_stream(p)
2162 self.pg_enable_capture(self.pg_interfaces)
2164 capture = self.pg0.get_capture(1)
2169 self.assertEqual(ip.src, self.nat_addr)
2170 self.assertEqual(ip.dst, host.ip4)
2171 self.assertEqual(tcp.sport, server_out_port)
2172 self.assertEqual(tcp.dport, host_in_port)
2173 self.check_tcp_checksum(p)
2175 self.logger.error(ppp("Unexpected or invalid packet:", p))
2178 def test_hairpinning2(self):
2179 """ NAT44 hairpinning - 1:1 NAT"""
2181 server1_nat_ip = "10.0.0.10"
2182 server2_nat_ip = "10.0.0.11"
2183 host = self.pg0.remote_hosts[0]
2184 server1 = self.pg0.remote_hosts[1]
2185 server2 = self.pg0.remote_hosts[2]
2186 server_tcp_port = 22
2187 server_udp_port = 20
2189 self.nat44_add_address(self.nat_addr)
2190 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2191 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2194 # add static mapping for servers
2195 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2196 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2200 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2201 IP(src=host.ip4, dst=server1_nat_ip) /
2202 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2204 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2205 IP(src=host.ip4, dst=server1_nat_ip) /
2206 UDP(sport=self.udp_port_in, dport=server_udp_port))
2208 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2209 IP(src=host.ip4, dst=server1_nat_ip) /
2210 ICMP(id=self.icmp_id_in, type='echo-request'))
2212 self.pg0.add_stream(pkts)
2213 self.pg_enable_capture(self.pg_interfaces)
2215 capture = self.pg0.get_capture(len(pkts))
2216 for packet in capture:
2218 self.assertEqual(packet[IP].src, self.nat_addr)
2219 self.assertEqual(packet[IP].dst, server1.ip4)
2220 if packet.haslayer(TCP):
2221 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2222 self.assertEqual(packet[TCP].dport, server_tcp_port)
2223 self.tcp_port_out = packet[TCP].sport
2224 self.check_tcp_checksum(packet)
2225 elif packet.haslayer(UDP):
2226 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2227 self.assertEqual(packet[UDP].dport, server_udp_port)
2228 self.udp_port_out = packet[UDP].sport
2230 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2231 self.icmp_id_out = packet[ICMP].id
2233 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2238 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239 IP(src=server1.ip4, dst=self.nat_addr) /
2240 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2242 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2243 IP(src=server1.ip4, dst=self.nat_addr) /
2244 UDP(sport=server_udp_port, dport=self.udp_port_out))
2246 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2247 IP(src=server1.ip4, dst=self.nat_addr) /
2248 ICMP(id=self.icmp_id_out, type='echo-reply'))
2250 self.pg0.add_stream(pkts)
2251 self.pg_enable_capture(self.pg_interfaces)
2253 capture = self.pg0.get_capture(len(pkts))
2254 for packet in capture:
2256 self.assertEqual(packet[IP].src, server1_nat_ip)
2257 self.assertEqual(packet[IP].dst, host.ip4)
2258 if packet.haslayer(TCP):
2259 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2260 self.assertEqual(packet[TCP].sport, server_tcp_port)
2261 self.check_tcp_checksum(packet)
2262 elif packet.haslayer(UDP):
2263 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2264 self.assertEqual(packet[UDP].sport, server_udp_port)
2266 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2268 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2271 # server2 to server1
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=server2.ip4, dst=server1_nat_ip) /
2275 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=server2.ip4, dst=server1_nat_ip) /
2279 UDP(sport=self.udp_port_in, dport=server_udp_port))
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=server2.ip4, dst=server1_nat_ip) /
2283 ICMP(id=self.icmp_id_in, type='echo-request'))
2285 self.pg0.add_stream(pkts)
2286 self.pg_enable_capture(self.pg_interfaces)
2288 capture = self.pg0.get_capture(len(pkts))
2289 for packet in capture:
2291 self.assertEqual(packet[IP].src, server2_nat_ip)
2292 self.assertEqual(packet[IP].dst, server1.ip4)
2293 if packet.haslayer(TCP):
2294 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2295 self.assertEqual(packet[TCP].dport, server_tcp_port)
2296 self.tcp_port_out = packet[TCP].sport
2297 self.check_tcp_checksum(packet)
2298 elif packet.haslayer(UDP):
2299 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2300 self.assertEqual(packet[UDP].dport, server_udp_port)
2301 self.udp_port_out = packet[UDP].sport
2303 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2304 self.icmp_id_out = packet[ICMP].id
2306 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2309 # server1 to server2
2311 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2312 IP(src=server1.ip4, dst=server2_nat_ip) /
2313 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2316 IP(src=server1.ip4, dst=server2_nat_ip) /
2317 UDP(sport=server_udp_port, dport=self.udp_port_out))
2319 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2320 IP(src=server1.ip4, dst=server2_nat_ip) /
2321 ICMP(id=self.icmp_id_out, type='echo-reply'))
2323 self.pg0.add_stream(pkts)
2324 self.pg_enable_capture(self.pg_interfaces)
2326 capture = self.pg0.get_capture(len(pkts))
2327 for packet in capture:
2329 self.assertEqual(packet[IP].src, server1_nat_ip)
2330 self.assertEqual(packet[IP].dst, server2.ip4)
2331 if packet.haslayer(TCP):
2332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2333 self.assertEqual(packet[TCP].sport, server_tcp_port)
2334 self.check_tcp_checksum(packet)
2335 elif packet.haslayer(UDP):
2336 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2337 self.assertEqual(packet[UDP].sport, server_udp_port)
2339 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2341 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2344 def test_max_translations_per_user(self):
2345 """ MAX translations per user - recycle the least recently used """
2347 self.nat44_add_address(self.nat_addr)
2348 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2349 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2352 # get maximum number of translations per user
2353 nat44_config = self.vapi.nat_show_config()
2355 # send more than maximum number of translations per user packets
2356 pkts_num = nat44_config.max_translations_per_user + 5
2358 for port in range(0, pkts_num):
2359 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2360 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2361 TCP(sport=1025 + port))
2363 self.pg0.add_stream(pkts)
2364 self.pg_enable_capture(self.pg_interfaces)
2367 # verify number of translated packet
2368 self.pg1.get_capture(pkts_num)
2370 def test_interface_addr(self):
2371 """ Acquire NAT44 addresses from interface """
2372 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2374 # no address in NAT pool
2375 adresses = self.vapi.nat44_address_dump()
2376 self.assertEqual(0, len(adresses))
2378 # configure interface address and check NAT address pool
2379 self.pg7.config_ip4()
2380 adresses = self.vapi.nat44_address_dump()
2381 self.assertEqual(1, len(adresses))
2382 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2384 # remove interface address and check NAT address pool
2385 self.pg7.unconfig_ip4()
2386 adresses = self.vapi.nat44_address_dump()
2387 self.assertEqual(0, len(adresses))
2389 def test_interface_addr_static_mapping(self):
2390 """ Static mapping with addresses from interface """
2393 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2394 self.nat44_add_static_mapping(
2396 external_sw_if_index=self.pg7.sw_if_index,
2399 # static mappings with external interface
2400 static_mappings = self.vapi.nat44_static_mapping_dump()
2401 self.assertEqual(1, len(static_mappings))
2402 self.assertEqual(self.pg7.sw_if_index,
2403 static_mappings[0].external_sw_if_index)
2404 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2406 # configure interface address and check static mappings
2407 self.pg7.config_ip4()
2408 static_mappings = self.vapi.nat44_static_mapping_dump()
2409 self.assertEqual(1, len(static_mappings))
2410 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2411 self.pg7.local_ip4n)
2412 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2413 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2415 # remove interface address and check static mappings
2416 self.pg7.unconfig_ip4()
2417 static_mappings = self.vapi.nat44_static_mapping_dump()
2418 self.assertEqual(0, len(static_mappings))
2420 def test_interface_addr_identity_nat(self):
2421 """ Identity NAT with addresses from interface """
2424 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425 self.vapi.nat44_add_del_identity_mapping(
2426 sw_if_index=self.pg7.sw_if_index,
2428 protocol=IP_PROTOS.tcp,
2431 # identity mappings with external interface
2432 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2433 self.assertEqual(1, len(identity_mappings))
2434 self.assertEqual(self.pg7.sw_if_index,
2435 identity_mappings[0].sw_if_index)
2437 # configure interface address and check identity mappings
2438 self.pg7.config_ip4()
2439 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2440 self.assertEqual(1, len(identity_mappings))
2441 self.assertEqual(identity_mappings[0].ip_address,
2442 self.pg7.local_ip4n)
2443 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2444 self.assertEqual(port, identity_mappings[0].port)
2445 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2447 # remove interface address and check identity mappings
2448 self.pg7.unconfig_ip4()
2449 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2450 self.assertEqual(0, len(identity_mappings))
2452 def test_ipfix_nat44_sess(self):
2453 """ IPFIX logging NAT44 session created/delted """
2454 self.ipfix_domain_id = 10
2455 self.ipfix_src_port = 20202
2456 colector_port = 30303
2457 bind_layers(UDP, IPFIX, dport=30303)
2458 self.nat44_add_address(self.nat_addr)
2459 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2460 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2462 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2463 src_address=self.pg3.local_ip4n,
2465 template_interval=10,
2466 collector_port=colector_port)
2467 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2468 src_port=self.ipfix_src_port)
2470 pkts = self.create_stream_in(self.pg0, self.pg1)
2471 self.pg0.add_stream(pkts)
2472 self.pg_enable_capture(self.pg_interfaces)
2474 capture = self.pg1.get_capture(len(pkts))
2475 self.verify_capture_out(capture)
2476 self.nat44_add_address(self.nat_addr, is_add=0)
2477 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2478 capture = self.pg3.get_capture(9)
2479 ipfix = IPFIXDecoder()
2480 # first load template
2482 self.assertTrue(p.haslayer(IPFIX))
2483 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2484 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2485 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2486 self.assertEqual(p[UDP].dport, colector_port)
2487 self.assertEqual(p[IPFIX].observationDomainID,
2488 self.ipfix_domain_id)
2489 if p.haslayer(Template):
2490 ipfix.add_template(p.getlayer(Template))
2491 # verify events in data set
2493 if p.haslayer(Data):
2494 data = ipfix.decode_data_set(p.getlayer(Set))
2495 self.verify_ipfix_nat44_ses(data)
2497 def test_ipfix_addr_exhausted(self):
2498 """ IPFIX logging NAT addresses exhausted """
2499 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2500 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2502 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2503 src_address=self.pg3.local_ip4n,
2505 template_interval=10)
2506 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2507 src_port=self.ipfix_src_port)
2509 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2510 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2512 self.pg0.add_stream(p)
2513 self.pg_enable_capture(self.pg_interfaces)
2515 capture = self.pg1.get_capture(0)
2516 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2517 capture = self.pg3.get_capture(9)
2518 ipfix = IPFIXDecoder()
2519 # first load template
2521 self.assertTrue(p.haslayer(IPFIX))
2522 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2523 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2524 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2525 self.assertEqual(p[UDP].dport, 4739)
2526 self.assertEqual(p[IPFIX].observationDomainID,
2527 self.ipfix_domain_id)
2528 if p.haslayer(Template):
2529 ipfix.add_template(p.getlayer(Template))
2530 # verify events in data set
2532 if p.haslayer(Data):
2533 data = ipfix.decode_data_set(p.getlayer(Set))
2534 self.verify_ipfix_addr_exhausted(data)
2536 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2537 def test_ipfix_max_sessions(self):
2538 """ IPFIX logging maximum session entries exceeded """
2539 self.nat44_add_address(self.nat_addr)
2540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2544 nat44_config = self.vapi.nat_show_config()
2545 max_sessions = 10 * nat44_config.translation_buckets
2548 for i in range(0, max_sessions):
2549 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2550 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2551 IP(src=src, dst=self.pg1.remote_ip4) /
2554 self.pg0.add_stream(pkts)
2555 self.pg_enable_capture(self.pg_interfaces)
2558 self.pg1.get_capture(max_sessions)
2559 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2560 src_address=self.pg3.local_ip4n,
2562 template_interval=10)
2563 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2564 src_port=self.ipfix_src_port)
2566 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2567 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2569 self.pg0.add_stream(p)
2570 self.pg_enable_capture(self.pg_interfaces)
2572 self.pg1.get_capture(0)
2573 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2574 capture = self.pg3.get_capture(9)
2575 ipfix = IPFIXDecoder()
2576 # first load template
2578 self.assertTrue(p.haslayer(IPFIX))
2579 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2580 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2581 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2582 self.assertEqual(p[UDP].dport, 4739)
2583 self.assertEqual(p[IPFIX].observationDomainID,
2584 self.ipfix_domain_id)
2585 if p.haslayer(Template):
2586 ipfix.add_template(p.getlayer(Template))
2587 # verify events in data set
2589 if p.haslayer(Data):
2590 data = ipfix.decode_data_set(p.getlayer(Set))
2591 self.verify_ipfix_max_sessions(data, max_sessions)
2593 def test_pool_addr_fib(self):
2594 """ NAT44 add pool addresses to FIB """
2595 static_addr = '10.0.0.10'
2596 self.nat44_add_address(self.nat_addr)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2600 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2603 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2604 ARP(op=ARP.who_has, pdst=self.nat_addr,
2605 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2606 self.pg1.add_stream(p)
2607 self.pg_enable_capture(self.pg_interfaces)
2609 capture = self.pg1.get_capture(1)
2610 self.assertTrue(capture[0].haslayer(ARP))
2611 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2614 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2615 ARP(op=ARP.who_has, pdst=static_addr,
2616 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2617 self.pg1.add_stream(p)
2618 self.pg_enable_capture(self.pg_interfaces)
2620 capture = self.pg1.get_capture(1)
2621 self.assertTrue(capture[0].haslayer(ARP))
2622 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2624 # send ARP to non-NAT44 interface
2625 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2626 ARP(op=ARP.who_has, pdst=self.nat_addr,
2627 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2628 self.pg2.add_stream(p)
2629 self.pg_enable_capture(self.pg_interfaces)
2631 capture = self.pg1.get_capture(0)
2633 # remove addresses and verify
2634 self.nat44_add_address(self.nat_addr, is_add=0)
2635 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2638 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2639 ARP(op=ARP.who_has, pdst=self.nat_addr,
2640 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2641 self.pg1.add_stream(p)
2642 self.pg_enable_capture(self.pg_interfaces)
2644 capture = self.pg1.get_capture(0)
2646 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2647 ARP(op=ARP.who_has, pdst=static_addr,
2648 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2649 self.pg1.add_stream(p)
2650 self.pg_enable_capture(self.pg_interfaces)
2652 capture = self.pg1.get_capture(0)
2654 def test_vrf_mode(self):
2655 """ NAT44 tenant VRF aware address pool mode """
2659 nat_ip1 = "10.0.0.10"
2660 nat_ip2 = "10.0.0.11"
2662 self.pg0.unconfig_ip4()
2663 self.pg1.unconfig_ip4()
2664 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2665 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2666 self.pg0.set_table_ip4(vrf_id1)
2667 self.pg1.set_table_ip4(vrf_id2)
2668 self.pg0.config_ip4()
2669 self.pg1.config_ip4()
2671 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2672 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2673 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2674 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2675 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2679 pkts = self.create_stream_in(self.pg0, self.pg2)
2680 self.pg0.add_stream(pkts)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture = self.pg2.get_capture(len(pkts))
2684 self.verify_capture_out(capture, nat_ip1)
2687 pkts = self.create_stream_in(self.pg1, self.pg2)
2688 self.pg1.add_stream(pkts)
2689 self.pg_enable_capture(self.pg_interfaces)
2691 capture = self.pg2.get_capture(len(pkts))
2692 self.verify_capture_out(capture, nat_ip2)
2694 self.pg0.unconfig_ip4()
2695 self.pg1.unconfig_ip4()
2696 self.pg0.set_table_ip4(0)
2697 self.pg1.set_table_ip4(0)
2698 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2699 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2701 def test_vrf_feature_independent(self):
2702 """ NAT44 tenant VRF independent address pool mode """
2704 nat_ip1 = "10.0.0.10"
2705 nat_ip2 = "10.0.0.11"
2707 self.nat44_add_address(nat_ip1)
2708 self.nat44_add_address(nat_ip2, vrf_id=99)
2709 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2710 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2711 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2715 pkts = self.create_stream_in(self.pg0, self.pg2)
2716 self.pg0.add_stream(pkts)
2717 self.pg_enable_capture(self.pg_interfaces)
2719 capture = self.pg2.get_capture(len(pkts))
2720 self.verify_capture_out(capture, nat_ip1)
2723 pkts = self.create_stream_in(self.pg1, self.pg2)
2724 self.pg1.add_stream(pkts)
2725 self.pg_enable_capture(self.pg_interfaces)
2727 capture = self.pg2.get_capture(len(pkts))
2728 self.verify_capture_out(capture, nat_ip1)
2730 def test_dynamic_ipless_interfaces(self):
2731 """ NAT44 interfaces without configured IP address """
2733 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2734 mactobinary(self.pg7.remote_mac),
2735 self.pg7.remote_ip4n,
2737 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2738 mactobinary(self.pg8.remote_mac),
2739 self.pg8.remote_ip4n,
2742 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2743 dst_address_length=32,
2744 next_hop_address=self.pg7.remote_ip4n,
2745 next_hop_sw_if_index=self.pg7.sw_if_index)
2746 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2747 dst_address_length=32,
2748 next_hop_address=self.pg8.remote_ip4n,
2749 next_hop_sw_if_index=self.pg8.sw_if_index)
2751 self.nat44_add_address(self.nat_addr)
2752 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2753 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2757 pkts = self.create_stream_in(self.pg7, self.pg8)
2758 self.pg7.add_stream(pkts)
2759 self.pg_enable_capture(self.pg_interfaces)
2761 capture = self.pg8.get_capture(len(pkts))
2762 self.verify_capture_out(capture)
2765 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2766 self.pg8.add_stream(pkts)
2767 self.pg_enable_capture(self.pg_interfaces)
2769 capture = self.pg7.get_capture(len(pkts))
2770 self.verify_capture_in(capture, self.pg7)
2772 def test_static_ipless_interfaces(self):
2773 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2775 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2776 mactobinary(self.pg7.remote_mac),
2777 self.pg7.remote_ip4n,
2779 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2780 mactobinary(self.pg8.remote_mac),
2781 self.pg8.remote_ip4n,
2784 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2785 dst_address_length=32,
2786 next_hop_address=self.pg7.remote_ip4n,
2787 next_hop_sw_if_index=self.pg7.sw_if_index)
2788 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2789 dst_address_length=32,
2790 next_hop_address=self.pg8.remote_ip4n,
2791 next_hop_sw_if_index=self.pg8.sw_if_index)
2793 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2794 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2799 pkts = self.create_stream_out(self.pg8)
2800 self.pg8.add_stream(pkts)
2801 self.pg_enable_capture(self.pg_interfaces)
2803 capture = self.pg7.get_capture(len(pkts))
2804 self.verify_capture_in(capture, self.pg7)
2807 pkts = self.create_stream_in(self.pg7, self.pg8)
2808 self.pg7.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 capture = self.pg8.get_capture(len(pkts))
2812 self.verify_capture_out(capture, self.nat_addr, True)
2814 def test_static_with_port_ipless_interfaces(self):
2815 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2817 self.tcp_port_out = 30606
2818 self.udp_port_out = 30607
2819 self.icmp_id_out = 30608
2821 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2822 mactobinary(self.pg7.remote_mac),
2823 self.pg7.remote_ip4n,
2825 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2826 mactobinary(self.pg8.remote_mac),
2827 self.pg8.remote_ip4n,
2830 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2831 dst_address_length=32,
2832 next_hop_address=self.pg7.remote_ip4n,
2833 next_hop_sw_if_index=self.pg7.sw_if_index)
2834 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2835 dst_address_length=32,
2836 next_hop_address=self.pg8.remote_ip4n,
2837 next_hop_sw_if_index=self.pg8.sw_if_index)
2839 self.nat44_add_address(self.nat_addr)
2840 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2841 self.tcp_port_in, self.tcp_port_out,
2842 proto=IP_PROTOS.tcp)
2843 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2844 self.udp_port_in, self.udp_port_out,
2845 proto=IP_PROTOS.udp)
2846 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2847 self.icmp_id_in, self.icmp_id_out,
2848 proto=IP_PROTOS.icmp)
2849 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2850 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2854 pkts = self.create_stream_out(self.pg8)
2855 self.pg8.add_stream(pkts)
2856 self.pg_enable_capture(self.pg_interfaces)
2858 capture = self.pg7.get_capture(len(pkts))
2859 self.verify_capture_in(capture, self.pg7)
2862 pkts = self.create_stream_in(self.pg7, self.pg8)
2863 self.pg7.add_stream(pkts)
2864 self.pg_enable_capture(self.pg_interfaces)
2866 capture = self.pg8.get_capture(len(pkts))
2867 self.verify_capture_out(capture)
2869 def test_static_unknown_proto(self):
2870 """ 1:1 NAT translate packet with unknown protocol """
2871 nat_ip = "10.0.0.10"
2872 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2878 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2879 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2881 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882 TCP(sport=1234, dport=1234))
2883 self.pg0.add_stream(p)
2884 self.pg_enable_capture(self.pg_interfaces)
2886 p = self.pg1.get_capture(1)
2889 self.assertEqual(packet[IP].src, nat_ip)
2890 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2891 self.assertTrue(packet.haslayer(GRE))
2892 self.check_ip_checksum(packet)
2894 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2898 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2899 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2901 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902 TCP(sport=1234, dport=1234))
2903 self.pg1.add_stream(p)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 p = self.pg0.get_capture(1)
2909 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2910 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2911 self.assertTrue(packet.haslayer(GRE))
2912 self.check_ip_checksum(packet)
2914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2917 def test_hairpinning_static_unknown_proto(self):
2918 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2920 host = self.pg0.remote_hosts[0]
2921 server = self.pg0.remote_hosts[1]
2923 host_nat_ip = "10.0.0.10"
2924 server_nat_ip = "10.0.0.11"
2926 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2927 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2928 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2929 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2933 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2934 IP(src=host.ip4, dst=server_nat_ip) /
2936 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937 TCP(sport=1234, dport=1234))
2938 self.pg0.add_stream(p)
2939 self.pg_enable_capture(self.pg_interfaces)
2941 p = self.pg0.get_capture(1)
2944 self.assertEqual(packet[IP].src, host_nat_ip)
2945 self.assertEqual(packet[IP].dst, server.ip4)
2946 self.assertTrue(packet.haslayer(GRE))
2947 self.check_ip_checksum(packet)
2949 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2953 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2954 IP(src=server.ip4, dst=host_nat_ip) /
2956 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957 TCP(sport=1234, dport=1234))
2958 self.pg0.add_stream(p)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 p = self.pg0.get_capture(1)
2964 self.assertEqual(packet[IP].src, server_nat_ip)
2965 self.assertEqual(packet[IP].dst, host.ip4)
2966 self.assertTrue(packet.haslayer(GRE))
2967 self.check_ip_checksum(packet)
2969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2972 def test_unknown_proto(self):
2973 """ NAT44 translate packet with unknown protocol """
2974 self.nat44_add_address(self.nat_addr)
2975 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2976 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2980 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2981 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2982 TCP(sport=self.tcp_port_in, dport=20))
2983 self.pg0.add_stream(p)
2984 self.pg_enable_capture(self.pg_interfaces)
2986 p = self.pg1.get_capture(1)
2988 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2989 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2991 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2992 TCP(sport=1234, dport=1234))
2993 self.pg0.add_stream(p)
2994 self.pg_enable_capture(self.pg_interfaces)
2996 p = self.pg1.get_capture(1)
2999 self.assertEqual(packet[IP].src, self.nat_addr)
3000 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3001 self.assertTrue(packet.haslayer(GRE))
3002 self.check_ip_checksum(packet)
3004 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3008 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3009 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3011 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3012 TCP(sport=1234, dport=1234))
3013 self.pg1.add_stream(p)
3014 self.pg_enable_capture(self.pg_interfaces)
3016 p = self.pg0.get_capture(1)
3019 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3020 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3021 self.assertTrue(packet.haslayer(GRE))
3022 self.check_ip_checksum(packet)
3024 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3027 def test_hairpinning_unknown_proto(self):
3028 """ NAT44 translate packet with unknown protocol - hairpinning """
3029 host = self.pg0.remote_hosts[0]
3030 server = self.pg0.remote_hosts[1]
3033 server_in_port = 5678
3034 server_out_port = 8765
3035 server_nat_ip = "10.0.0.11"
3037 self.nat44_add_address(self.nat_addr)
3038 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3039 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3042 # add static mapping for server
3043 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3046 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3047 IP(src=host.ip4, dst=server_nat_ip) /
3048 TCP(sport=host_in_port, dport=server_out_port))
3049 self.pg0.add_stream(p)
3050 self.pg_enable_capture(self.pg_interfaces)
3052 capture = self.pg0.get_capture(1)
3054 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3055 IP(src=host.ip4, dst=server_nat_ip) /
3057 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3058 TCP(sport=1234, dport=1234))
3059 self.pg0.add_stream(p)
3060 self.pg_enable_capture(self.pg_interfaces)
3062 p = self.pg0.get_capture(1)
3065 self.assertEqual(packet[IP].src, self.nat_addr)
3066 self.assertEqual(packet[IP].dst, server.ip4)
3067 self.assertTrue(packet.haslayer(GRE))
3068 self.check_ip_checksum(packet)
3070 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3074 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3075 IP(src=server.ip4, dst=self.nat_addr) /
3077 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3078 TCP(sport=1234, dport=1234))
3079 self.pg0.add_stream(p)
3080 self.pg_enable_capture(self.pg_interfaces)
3082 p = self.pg0.get_capture(1)
3085 self.assertEqual(packet[IP].src, server_nat_ip)
3086 self.assertEqual(packet[IP].dst, host.ip4)
3087 self.assertTrue(packet.haslayer(GRE))
3088 self.check_ip_checksum(packet)
3090 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3093 def test_output_feature(self):
3094 """ NAT44 interface output feature (in2out postrouting) """
3095 self.nat44_add_address(self.nat_addr)
3096 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3097 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3098 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3102 pkts = self.create_stream_in(self.pg0, self.pg3)
3103 self.pg0.add_stream(pkts)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg3.get_capture(len(pkts))
3107 self.verify_capture_out(capture)
3110 pkts = self.create_stream_out(self.pg3)
3111 self.pg3.add_stream(pkts)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 capture = self.pg0.get_capture(len(pkts))
3115 self.verify_capture_in(capture, self.pg0)
3117 # from non-NAT interface to NAT inside interface
3118 pkts = self.create_stream_in(self.pg2, self.pg0)
3119 self.pg2.add_stream(pkts)
3120 self.pg_enable_capture(self.pg_interfaces)
3122 capture = self.pg0.get_capture(len(pkts))
3123 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3125 def test_output_feature_vrf_aware(self):
3126 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3127 nat_ip_vrf10 = "10.0.0.10"
3128 nat_ip_vrf20 = "10.0.0.20"
3130 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3131 dst_address_length=32,
3132 next_hop_address=self.pg3.remote_ip4n,
3133 next_hop_sw_if_index=self.pg3.sw_if_index,
3135 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3136 dst_address_length=32,
3137 next_hop_address=self.pg3.remote_ip4n,
3138 next_hop_sw_if_index=self.pg3.sw_if_index,
3141 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3142 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3143 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3144 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3145 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3149 pkts = self.create_stream_in(self.pg4, self.pg3)
3150 self.pg4.add_stream(pkts)
3151 self.pg_enable_capture(self.pg_interfaces)
3153 capture = self.pg3.get_capture(len(pkts))
3154 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3157 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3158 self.pg3.add_stream(pkts)
3159 self.pg_enable_capture(self.pg_interfaces)
3161 capture = self.pg4.get_capture(len(pkts))
3162 self.verify_capture_in(capture, self.pg4)
3165 pkts = self.create_stream_in(self.pg6, self.pg3)
3166 self.pg6.add_stream(pkts)
3167 self.pg_enable_capture(self.pg_interfaces)
3169 capture = self.pg3.get_capture(len(pkts))
3170 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3173 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3174 self.pg3.add_stream(pkts)
3175 self.pg_enable_capture(self.pg_interfaces)
3177 capture = self.pg6.get_capture(len(pkts))
3178 self.verify_capture_in(capture, self.pg6)
3180 def test_output_feature_hairpinning(self):
3181 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3182 host = self.pg0.remote_hosts[0]
3183 server = self.pg0.remote_hosts[1]
3186 server_in_port = 5678
3187 server_out_port = 8765
3189 self.nat44_add_address(self.nat_addr)
3190 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3191 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3194 # add static mapping for server
3195 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3196 server_in_port, server_out_port,
3197 proto=IP_PROTOS.tcp)
3199 # send packet from host to server
3200 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3201 IP(src=host.ip4, dst=self.nat_addr) /
3202 TCP(sport=host_in_port, dport=server_out_port))
3203 self.pg0.add_stream(p)
3204 self.pg_enable_capture(self.pg_interfaces)
3206 capture = self.pg0.get_capture(1)
3211 self.assertEqual(ip.src, self.nat_addr)
3212 self.assertEqual(ip.dst, server.ip4)
3213 self.assertNotEqual(tcp.sport, host_in_port)
3214 self.assertEqual(tcp.dport, server_in_port)
3215 self.check_tcp_checksum(p)
3216 host_out_port = tcp.sport
3218 self.logger.error(ppp("Unexpected or invalid packet:", p))
3221 # send reply from server to host
3222 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3223 IP(src=server.ip4, dst=self.nat_addr) /
3224 TCP(sport=server_in_port, dport=host_out_port))
3225 self.pg0.add_stream(p)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 capture = self.pg0.get_capture(1)
3233 self.assertEqual(ip.src, self.nat_addr)
3234 self.assertEqual(ip.dst, host.ip4)
3235 self.assertEqual(tcp.sport, server_out_port)
3236 self.assertEqual(tcp.dport, host_in_port)
3237 self.check_tcp_checksum(p)
3239 self.logger.error(ppp("Unexpected or invalid packet:", p))
3242 def test_one_armed_nat44(self):
3243 """ One armed NAT44 """
3244 remote_host = self.pg9.remote_hosts[0]
3245 local_host = self.pg9.remote_hosts[1]
3248 self.nat44_add_address(self.nat_addr)
3249 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3250 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3254 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3255 IP(src=local_host.ip4, dst=remote_host.ip4) /
3256 TCP(sport=12345, dport=80))
3257 self.pg9.add_stream(p)
3258 self.pg_enable_capture(self.pg_interfaces)
3260 capture = self.pg9.get_capture(1)
3265 self.assertEqual(ip.src, self.nat_addr)
3266 self.assertEqual(ip.dst, remote_host.ip4)
3267 self.assertNotEqual(tcp.sport, 12345)
3268 external_port = tcp.sport
3269 self.assertEqual(tcp.dport, 80)
3270 self.check_tcp_checksum(p)
3271 self.check_ip_checksum(p)
3273 self.logger.error(ppp("Unexpected or invalid packet:", p))
3277 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3278 IP(src=remote_host.ip4, dst=self.nat_addr) /
3279 TCP(sport=80, dport=external_port))
3280 self.pg9.add_stream(p)
3281 self.pg_enable_capture(self.pg_interfaces)
3283 capture = self.pg9.get_capture(1)
3288 self.assertEqual(ip.src, remote_host.ip4)
3289 self.assertEqual(ip.dst, local_host.ip4)
3290 self.assertEqual(tcp.sport, 80)
3291 self.assertEqual(tcp.dport, 12345)
3292 self.check_tcp_checksum(p)
3293 self.check_ip_checksum(p)
3295 self.logger.error(ppp("Unexpected or invalid packet:", p))
3298 def test_one_armed_nat44_static(self):
3299 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3300 remote_host = self.pg9.remote_hosts[0]
3301 local_host = self.pg9.remote_hosts[1]
3306 self.vapi.nat44_forwarding_enable_disable(1)
3307 self.nat44_add_address(self.nat_addr, twice_nat=1)
3308 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3309 local_port, external_port,
3310 proto=IP_PROTOS.tcp, out2in_only=1,
3312 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3313 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3316 # from client to service
3317 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3318 IP(src=remote_host.ip4, dst=self.nat_addr) /
3319 TCP(sport=12345, dport=external_port))
3320 self.pg9.add_stream(p)
3321 self.pg_enable_capture(self.pg_interfaces)
3323 capture = self.pg9.get_capture(1)
3329 self.assertEqual(ip.dst, local_host.ip4)
3330 self.assertEqual(ip.src, self.nat_addr)
3331 self.assertEqual(tcp.dport, local_port)
3332 self.assertNotEqual(tcp.sport, 12345)
3333 eh_port_in = tcp.sport
3334 self.check_tcp_checksum(p)
3335 self.check_ip_checksum(p)
3337 self.logger.error(ppp("Unexpected or invalid packet:", p))
3340 # from service back to client
3341 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3342 IP(src=local_host.ip4, dst=self.nat_addr) /
3343 TCP(sport=local_port, dport=eh_port_in))
3344 self.pg9.add_stream(p)
3345 self.pg_enable_capture(self.pg_interfaces)
3347 capture = self.pg9.get_capture(1)
3352 self.assertEqual(ip.src, self.nat_addr)
3353 self.assertEqual(ip.dst, remote_host.ip4)
3354 self.assertEqual(tcp.sport, external_port)
3355 self.assertEqual(tcp.dport, 12345)
3356 self.check_tcp_checksum(p)
3357 self.check_ip_checksum(p)
3359 self.logger.error(ppp("Unexpected or invalid packet:", p))
3362 def test_del_session(self):
3363 """ Delete NAT44 session """
3364 self.nat44_add_address(self.nat_addr)
3365 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3366 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3369 pkts = self.create_stream_in(self.pg0, self.pg1)
3370 self.pg0.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 capture = self.pg1.get_capture(len(pkts))
3375 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3376 nsessions = len(sessions)
3378 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3379 sessions[0].inside_port,
3380 sessions[0].protocol)
3381 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3382 sessions[1].outside_port,
3383 sessions[1].protocol,
3386 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3387 self.assertEqual(nsessions - len(sessions), 2)
3389 def test_set_get_reass(self):
3390 """ NAT44 set/get virtual fragmentation reassembly """
3391 reas_cfg1 = self.vapi.nat_get_reass()
3393 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3394 max_reass=reas_cfg1.ip4_max_reass * 2,
3395 max_frag=reas_cfg1.ip4_max_frag * 2)
3397 reas_cfg2 = self.vapi.nat_get_reass()
3399 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3400 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3401 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3403 self.vapi.nat_set_reass(drop_frag=1)
3404 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3406 def test_frag_in_order(self):
3407 """ NAT44 translate fragments arriving in order """
3408 self.nat44_add_address(self.nat_addr)
3409 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3410 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3413 data = "A" * 4 + "B" * 16 + "C" * 3
3414 self.tcp_port_in = random.randint(1025, 65535)
3416 reass = self.vapi.nat_reass_dump()
3417 reass_n_start = len(reass)
3420 pkts = self.create_stream_frag(self.pg0,
3421 self.pg1.remote_ip4,
3425 self.pg0.add_stream(pkts)
3426 self.pg_enable_capture(self.pg_interfaces)
3428 frags = self.pg1.get_capture(len(pkts))
3429 p = self.reass_frags_and_verify(frags,
3431 self.pg1.remote_ip4)
3432 self.assertEqual(p[TCP].dport, 20)
3433 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3434 self.tcp_port_out = p[TCP].sport
3435 self.assertEqual(data, p[Raw].load)
3438 pkts = self.create_stream_frag(self.pg1,
3443 self.pg1.add_stream(pkts)
3444 self.pg_enable_capture(self.pg_interfaces)
3446 frags = self.pg0.get_capture(len(pkts))
3447 p = self.reass_frags_and_verify(frags,
3448 self.pg1.remote_ip4,
3449 self.pg0.remote_ip4)
3450 self.assertEqual(p[TCP].sport, 20)
3451 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3452 self.assertEqual(data, p[Raw].load)
3454 reass = self.vapi.nat_reass_dump()
3455 reass_n_end = len(reass)
3457 self.assertEqual(reass_n_end - reass_n_start, 2)
3459 def test_reass_hairpinning(self):
3460 """ NAT44 fragments hairpinning """
3461 host = self.pg0.remote_hosts[0]
3462 server = self.pg0.remote_hosts[1]
3463 host_in_port = random.randint(1025, 65535)
3465 server_in_port = random.randint(1025, 65535)
3466 server_out_port = random.randint(1025, 65535)
3467 data = "A" * 4 + "B" * 16 + "C" * 3
3469 self.nat44_add_address(self.nat_addr)
3470 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3471 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3473 # add static mapping for server
3474 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3475 server_in_port, server_out_port,
3476 proto=IP_PROTOS.tcp)
3478 # send packet from host to server
3479 pkts = self.create_stream_frag(self.pg0,
3484 self.pg0.add_stream(pkts)
3485 self.pg_enable_capture(self.pg_interfaces)
3487 frags = self.pg0.get_capture(len(pkts))
3488 p = self.reass_frags_and_verify(frags,
3491 self.assertNotEqual(p[TCP].sport, host_in_port)
3492 self.assertEqual(p[TCP].dport, server_in_port)
3493 self.assertEqual(data, p[Raw].load)
3495 def test_frag_out_of_order(self):
3496 """ NAT44 translate fragments arriving out of order """
3497 self.nat44_add_address(self.nat_addr)
3498 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3499 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3502 data = "A" * 4 + "B" * 16 + "C" * 3
3503 random.randint(1025, 65535)
3506 pkts = self.create_stream_frag(self.pg0,
3507 self.pg1.remote_ip4,
3512 self.pg0.add_stream(pkts)
3513 self.pg_enable_capture(self.pg_interfaces)
3515 frags = self.pg1.get_capture(len(pkts))
3516 p = self.reass_frags_and_verify(frags,
3518 self.pg1.remote_ip4)
3519 self.assertEqual(p[TCP].dport, 20)
3520 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3521 self.tcp_port_out = p[TCP].sport
3522 self.assertEqual(data, p[Raw].load)
3525 pkts = self.create_stream_frag(self.pg1,
3531 self.pg1.add_stream(pkts)
3532 self.pg_enable_capture(self.pg_interfaces)
3534 frags = self.pg0.get_capture(len(pkts))
3535 p = self.reass_frags_and_verify(frags,
3536 self.pg1.remote_ip4,
3537 self.pg0.remote_ip4)
3538 self.assertEqual(p[TCP].sport, 20)
3539 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3540 self.assertEqual(data, p[Raw].load)
3542 def test_port_restricted(self):
3543 """ Port restricted NAT44 (MAP-E CE) """
3544 self.nat44_add_address(self.nat_addr)
3545 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3546 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3548 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3549 "psid-offset 6 psid-len 6")
3551 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3552 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3553 TCP(sport=4567, dport=22))
3554 self.pg0.add_stream(p)
3555 self.pg_enable_capture(self.pg_interfaces)
3557 capture = self.pg1.get_capture(1)
3562 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3563 self.assertEqual(ip.src, self.nat_addr)
3564 self.assertEqual(tcp.dport, 22)
3565 self.assertNotEqual(tcp.sport, 4567)
3566 self.assertEqual((tcp.sport >> 6) & 63, 10)
3567 self.check_tcp_checksum(p)
3568 self.check_ip_checksum(p)
3570 self.logger.error(ppp("Unexpected or invalid packet:", p))
3573 def test_twice_nat(self):
3575 twice_nat_addr = '10.0.1.3'
3580 self.nat44_add_address(self.nat_addr)
3581 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3582 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3583 port_in, port_out, proto=IP_PROTOS.tcp,
3585 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3586 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3589 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3590 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3591 TCP(sport=eh_port_out, dport=port_out))
3592 self.pg1.add_stream(p)
3593 self.pg_enable_capture(self.pg_interfaces)
3595 capture = self.pg0.get_capture(1)
3600 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3601 self.assertEqual(ip.src, twice_nat_addr)
3602 self.assertEqual(tcp.dport, port_in)
3603 self.assertNotEqual(tcp.sport, eh_port_out)
3604 eh_port_in = tcp.sport
3605 self.check_tcp_checksum(p)
3606 self.check_ip_checksum(p)
3608 self.logger.error(ppp("Unexpected or invalid packet:", p))
3611 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3612 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3613 TCP(sport=port_in, dport=eh_port_in))
3614 self.pg0.add_stream(p)
3615 self.pg_enable_capture(self.pg_interfaces)
3617 capture = self.pg1.get_capture(1)
3622 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3623 self.assertEqual(ip.src, self.nat_addr)
3624 self.assertEqual(tcp.dport, eh_port_out)
3625 self.assertEqual(tcp.sport, port_out)
3626 self.check_tcp_checksum(p)
3627 self.check_ip_checksum(p)
3629 self.logger.error(ppp("Unexpected or invalid packet:", p))
3632 def test_twice_nat_lb(self):
3633 """ Twice NAT44 local service load balancing """
3634 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3635 twice_nat_addr = '10.0.1.3'
3640 server1 = self.pg0.remote_hosts[0]
3641 server2 = self.pg0.remote_hosts[1]
3643 locals = [{'addr': server1.ip4n,
3646 {'addr': server2.ip4n,
3650 self.nat44_add_address(self.nat_addr)
3651 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3653 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3657 local_num=len(locals),
3659 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3660 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3663 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3664 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3665 TCP(sport=eh_port_out, dport=external_port))
3666 self.pg1.add_stream(p)
3667 self.pg_enable_capture(self.pg_interfaces)
3669 capture = self.pg0.get_capture(1)
3675 self.assertEqual(ip.src, twice_nat_addr)
3676 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3677 if ip.dst == server1.ip4:
3681 self.assertNotEqual(tcp.sport, eh_port_out)
3682 eh_port_in = tcp.sport
3683 self.assertEqual(tcp.dport, local_port)
3684 self.check_tcp_checksum(p)
3685 self.check_ip_checksum(p)
3687 self.logger.error(ppp("Unexpected or invalid packet:", p))
3690 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3691 IP(src=server.ip4, dst=twice_nat_addr) /
3692 TCP(sport=local_port, dport=eh_port_in))
3693 self.pg0.add_stream(p)
3694 self.pg_enable_capture(self.pg_interfaces)
3696 capture = self.pg1.get_capture(1)
3701 self.assertEqual(ip.src, self.nat_addr)
3702 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3703 self.assertEqual(tcp.sport, external_port)
3704 self.assertEqual(tcp.dport, eh_port_out)
3705 self.check_tcp_checksum(p)
3706 self.check_ip_checksum(p)
3708 self.logger.error(ppp("Unexpected or invalid packet:", p))
3711 def test_twice_nat_interface_addr(self):
3712 """ Acquire twice NAT44 addresses from interface """
3713 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3715 # no address in NAT pool
3716 adresses = self.vapi.nat44_address_dump()
3717 self.assertEqual(0, len(adresses))
3719 # configure interface address and check NAT address pool
3720 self.pg7.config_ip4()
3721 adresses = self.vapi.nat44_address_dump()
3722 self.assertEqual(1, len(adresses))
3723 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3724 self.assertEqual(adresses[0].twice_nat, 1)
3726 # remove interface address and check NAT address pool
3727 self.pg7.unconfig_ip4()
3728 adresses = self.vapi.nat44_address_dump()
3729 self.assertEqual(0, len(adresses))
3731 def test_ipfix_max_frags(self):
3732 """ IPFIX logging maximum fragments pending reassembly exceeded """
3733 self.nat44_add_address(self.nat_addr)
3734 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3735 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3737 self.vapi.nat_set_reass(max_frag=0)
3738 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3739 src_address=self.pg3.local_ip4n,
3741 template_interval=10)
3742 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3743 src_port=self.ipfix_src_port)
3745 data = "A" * 4 + "B" * 16 + "C" * 3
3746 self.tcp_port_in = random.randint(1025, 65535)
3747 pkts = self.create_stream_frag(self.pg0,
3748 self.pg1.remote_ip4,
3752 self.pg0.add_stream(pkts[-1])
3753 self.pg_enable_capture(self.pg_interfaces)
3755 frags = self.pg1.get_capture(0)
3756 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3757 capture = self.pg3.get_capture(9)
3758 ipfix = IPFIXDecoder()
3759 # first load template
3761 self.assertTrue(p.haslayer(IPFIX))
3762 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3763 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3764 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3765 self.assertEqual(p[UDP].dport, 4739)
3766 self.assertEqual(p[IPFIX].observationDomainID,
3767 self.ipfix_domain_id)
3768 if p.haslayer(Template):
3769 ipfix.add_template(p.getlayer(Template))
3770 # verify events in data set
3772 if p.haslayer(Data):
3773 data = ipfix.decode_data_set(p.getlayer(Set))
3774 self.verify_ipfix_max_fragments_ip4(data, 0,
3775 self.pg0.remote_ip4n)
3778 super(TestNAT44, self).tearDown()
3779 if not self.vpp_dead:
3780 self.logger.info(self.vapi.cli("show nat44 addresses"))
3781 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3782 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3783 self.logger.info(self.vapi.cli("show nat44 interface address"))
3784 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3785 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3786 self.vapi.cli("nat addr-port-assignment-alg default")
3790 class TestNAT44Out2InDPO(MethodHolder):
3791 """ NAT44 Test Cases using out2in DPO """
3794 def setUpConstants(cls):
3795 super(TestNAT44Out2InDPO, cls).setUpConstants()
3796 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3799 def setUpClass(cls):
3800 super(TestNAT44Out2InDPO, cls).setUpClass()
3803 cls.tcp_port_in = 6303
3804 cls.tcp_port_out = 6303
3805 cls.udp_port_in = 6304
3806 cls.udp_port_out = 6304
3807 cls.icmp_id_in = 6305
3808 cls.icmp_id_out = 6305
3809 cls.nat_addr = '10.0.0.3'
3810 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3811 cls.dst_ip4 = '192.168.70.1'
3813 cls.create_pg_interfaces(range(2))
3816 cls.pg0.config_ip4()
3817 cls.pg0.resolve_arp()
3820 cls.pg1.config_ip6()
3821 cls.pg1.resolve_ndp()
3823 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3824 dst_address_length=0,
3825 next_hop_address=cls.pg1.remote_ip6n,
3826 next_hop_sw_if_index=cls.pg1.sw_if_index)
3829 super(TestNAT44Out2InDPO, cls).tearDownClass()
3832 def configure_xlat(self):
3833 self.dst_ip6_pfx = '1:2:3::'
3834 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3836 self.dst_ip6_pfx_len = 96
3837 self.src_ip6_pfx = '4:5:6::'
3838 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3840 self.src_ip6_pfx_len = 96
3841 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3842 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3843 '\x00\x00\x00\x00', 0, is_translation=1,
3846 def test_464xlat_ce(self):
3847 """ Test 464XLAT CE with NAT44 """
3849 self.configure_xlat()
3851 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3852 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3854 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3855 self.dst_ip6_pfx_len)
3856 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3857 self.src_ip6_pfx_len)
3860 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3861 self.pg0.add_stream(pkts)
3862 self.pg_enable_capture(self.pg_interfaces)
3864 capture = self.pg1.get_capture(len(pkts))
3865 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3868 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3870 self.pg1.add_stream(pkts)
3871 self.pg_enable_capture(self.pg_interfaces)
3873 capture = self.pg0.get_capture(len(pkts))
3874 self.verify_capture_in(capture, self.pg0)
3876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3878 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3879 self.nat_addr_n, is_add=0)
3881 def test_464xlat_ce_no_nat(self):
3882 """ Test 464XLAT CE without NAT44 """
3884 self.configure_xlat()
3886 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3887 self.dst_ip6_pfx_len)
3888 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3889 self.src_ip6_pfx_len)
3891 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3892 self.pg0.add_stream(pkts)
3893 self.pg_enable_capture(self.pg_interfaces)
3895 capture = self.pg1.get_capture(len(pkts))
3896 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3897 nat_ip=out_dst_ip6, same_port=True)
3899 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3900 self.pg1.add_stream(pkts)
3901 self.pg_enable_capture(self.pg_interfaces)
3903 capture = self.pg0.get_capture(len(pkts))
3904 self.verify_capture_in(capture, self.pg0)
3907 class TestDeterministicNAT(MethodHolder):
3908 """ Deterministic NAT Test Cases """
3911 def setUpConstants(cls):
3912 super(TestDeterministicNAT, cls).setUpConstants()
3913 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3916 def setUpClass(cls):
3917 super(TestDeterministicNAT, cls).setUpClass()
3920 cls.tcp_port_in = 6303
3921 cls.tcp_external_port = 6303
3922 cls.udp_port_in = 6304
3923 cls.udp_external_port = 6304
3924 cls.icmp_id_in = 6305
3925 cls.nat_addr = '10.0.0.3'
3927 cls.create_pg_interfaces(range(3))
3928 cls.interfaces = list(cls.pg_interfaces)
3930 for i in cls.interfaces:
3935 cls.pg0.generate_remote_hosts(2)
3936 cls.pg0.configure_ipv4_neighbors()
3939 super(TestDeterministicNAT, cls).tearDownClass()
3942 def create_stream_in(self, in_if, out_if, ttl=64):
3944 Create packet stream for inside network
3946 :param in_if: Inside interface
3947 :param out_if: Outside interface
3948 :param ttl: TTL of generated packets
3952 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3953 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3954 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3958 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3959 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3960 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3964 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3965 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3966 ICMP(id=self.icmp_id_in, type='echo-request'))
3971 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3973 Create packet stream for outside network
3975 :param out_if: Outside interface
3976 :param dst_ip: Destination IP address (Default use global NAT address)
3977 :param ttl: TTL of generated packets
3980 dst_ip = self.nat_addr
3983 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3984 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3985 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3989 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3990 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3991 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3995 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3996 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3997 ICMP(id=self.icmp_external_id, type='echo-reply'))
4002 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4004 Verify captured packets on outside network
4006 :param capture: Captured packets
4007 :param nat_ip: Translated IP address (Default use global NAT address)
4008 :param same_port: Sorce port number is not translated (Default False)
4009 :param packet_num: Expected number of packets (Default 3)
4012 nat_ip = self.nat_addr
4013 self.assertEqual(packet_num, len(capture))
4014 for packet in capture:
4016 self.assertEqual(packet[IP].src, nat_ip)
4017 if packet.haslayer(TCP):
4018 self.tcp_port_out = packet[TCP].sport
4019 elif packet.haslayer(UDP):
4020 self.udp_port_out = packet[UDP].sport
4022 self.icmp_external_id = packet[ICMP].id
4024 self.logger.error(ppp("Unexpected or invalid packet "
4025 "(outside network):", packet))
4028 def initiate_tcp_session(self, in_if, out_if):
4030 Initiates TCP session
4032 :param in_if: Inside interface
4033 :param out_if: Outside interface
4036 # SYN packet in->out
4037 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4038 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4039 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4042 self.pg_enable_capture(self.pg_interfaces)
4044 capture = out_if.get_capture(1)
4046 self.tcp_port_out = p[TCP].sport
4048 # SYN + ACK packet out->in
4049 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4050 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4051 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4053 out_if.add_stream(p)
4054 self.pg_enable_capture(self.pg_interfaces)
4056 in_if.get_capture(1)
4058 # ACK packet in->out
4059 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4060 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4061 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4064 self.pg_enable_capture(self.pg_interfaces)
4066 out_if.get_capture(1)
4069 self.logger.error("TCP 3 way handshake failed")
4072 def verify_ipfix_max_entries_per_user(self, data):
4074 Verify IPFIX maximum entries per user exceeded event
4076 :param data: Decoded IPFIX data records
4078 self.assertEqual(1, len(data))
4081 self.assertEqual(ord(record[230]), 13)
4082 # natQuotaExceededEvent
4083 self.assertEqual('\x03\x00\x00\x00', record[466])
4085 self.assertEqual('\xe8\x03\x00\x00', record[473])
4087 self.assertEqual(self.pg0.remote_ip4n, record[8])
4089 def test_deterministic_mode(self):
4090 """ NAT plugin run deterministic mode """
4091 in_addr = '172.16.255.0'
4092 out_addr = '172.17.255.50'
4093 in_addr_t = '172.16.255.20'
4094 in_addr_n = socket.inet_aton(in_addr)
4095 out_addr_n = socket.inet_aton(out_addr)
4096 in_addr_t_n = socket.inet_aton(in_addr_t)
4100 nat_config = self.vapi.nat_show_config()
4101 self.assertEqual(1, nat_config.deterministic)
4103 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4105 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4106 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4107 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4108 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4110 deterministic_mappings = self.vapi.nat_det_map_dump()
4111 self.assertEqual(len(deterministic_mappings), 1)
4112 dsm = deterministic_mappings[0]
4113 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4114 self.assertEqual(in_plen, dsm.in_plen)
4115 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4116 self.assertEqual(out_plen, dsm.out_plen)
4118 self.clear_nat_det()
4119 deterministic_mappings = self.vapi.nat_det_map_dump()
4120 self.assertEqual(len(deterministic_mappings), 0)
4122 def test_set_timeouts(self):
4123 """ Set deterministic NAT timeouts """
4124 timeouts_before = self.vapi.nat_det_get_timeouts()
4126 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4127 timeouts_before.tcp_established + 10,
4128 timeouts_before.tcp_transitory + 10,
4129 timeouts_before.icmp + 10)
4131 timeouts_after = self.vapi.nat_det_get_timeouts()
4133 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4134 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4135 self.assertNotEqual(timeouts_before.tcp_established,
4136 timeouts_after.tcp_established)
4137 self.assertNotEqual(timeouts_before.tcp_transitory,
4138 timeouts_after.tcp_transitory)
4140 def test_det_in(self):
4141 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4143 nat_ip = "10.0.0.10"
4145 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4147 socket.inet_aton(nat_ip),
4149 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4150 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4154 pkts = self.create_stream_in(self.pg0, self.pg1)
4155 self.pg0.add_stream(pkts)
4156 self.pg_enable_capture(self.pg_interfaces)
4158 capture = self.pg1.get_capture(len(pkts))
4159 self.verify_capture_out(capture, nat_ip)
4162 pkts = self.create_stream_out(self.pg1, nat_ip)
4163 self.pg1.add_stream(pkts)
4164 self.pg_enable_capture(self.pg_interfaces)
4166 capture = self.pg0.get_capture(len(pkts))
4167 self.verify_capture_in(capture, self.pg0)
4170 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4171 self.assertEqual(len(sessions), 3)
4175 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4176 self.assertEqual(s.in_port, self.tcp_port_in)
4177 self.assertEqual(s.out_port, self.tcp_port_out)
4178 self.assertEqual(s.ext_port, self.tcp_external_port)
4182 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4183 self.assertEqual(s.in_port, self.udp_port_in)
4184 self.assertEqual(s.out_port, self.udp_port_out)
4185 self.assertEqual(s.ext_port, self.udp_external_port)
4189 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4190 self.assertEqual(s.in_port, self.icmp_id_in)
4191 self.assertEqual(s.out_port, self.icmp_external_id)
4193 def test_multiple_users(self):
4194 """ Deterministic NAT multiple users """
4196 nat_ip = "10.0.0.10"
4198 external_port = 6303
4200 host0 = self.pg0.remote_hosts[0]
4201 host1 = self.pg0.remote_hosts[1]
4203 self.vapi.nat_det_add_del_map(host0.ip4n,
4205 socket.inet_aton(nat_ip),
4207 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4208 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4212 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4213 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4214 TCP(sport=port_in, dport=external_port))
4215 self.pg0.add_stream(p)
4216 self.pg_enable_capture(self.pg_interfaces)
4218 capture = self.pg1.get_capture(1)
4223 self.assertEqual(ip.src, nat_ip)
4224 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4225 self.assertEqual(tcp.dport, external_port)
4226 port_out0 = tcp.sport
4228 self.logger.error(ppp("Unexpected or invalid packet:", p))
4232 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4233 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4234 TCP(sport=port_in, dport=external_port))
4235 self.pg0.add_stream(p)
4236 self.pg_enable_capture(self.pg_interfaces)
4238 capture = self.pg1.get_capture(1)
4243 self.assertEqual(ip.src, nat_ip)
4244 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4245 self.assertEqual(tcp.dport, external_port)
4246 port_out1 = tcp.sport
4248 self.logger.error(ppp("Unexpected or invalid packet:", p))
4251 dms = self.vapi.nat_det_map_dump()
4252 self.assertEqual(1, len(dms))
4253 self.assertEqual(2, dms[0].ses_num)
4256 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4257 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4258 TCP(sport=external_port, dport=port_out0))
4259 self.pg1.add_stream(p)
4260 self.pg_enable_capture(self.pg_interfaces)
4262 capture = self.pg0.get_capture(1)
4267 self.assertEqual(ip.src, self.pg1.remote_ip4)
4268 self.assertEqual(ip.dst, host0.ip4)
4269 self.assertEqual(tcp.dport, port_in)
4270 self.assertEqual(tcp.sport, external_port)
4272 self.logger.error(ppp("Unexpected or invalid packet:", p))
4276 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4277 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4278 TCP(sport=external_port, dport=port_out1))
4279 self.pg1.add_stream(p)
4280 self.pg_enable_capture(self.pg_interfaces)
4282 capture = self.pg0.get_capture(1)
4287 self.assertEqual(ip.src, self.pg1.remote_ip4)
4288 self.assertEqual(ip.dst, host1.ip4)
4289 self.assertEqual(tcp.dport, port_in)
4290 self.assertEqual(tcp.sport, external_port)
4292 self.logger.error(ppp("Unexpected or invalid packet", p))
4295 # session close api test
4296 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4298 self.pg1.remote_ip4n,
4300 dms = self.vapi.nat_det_map_dump()
4301 self.assertEqual(dms[0].ses_num, 1)
4303 self.vapi.nat_det_close_session_in(host0.ip4n,
4305 self.pg1.remote_ip4n,
4307 dms = self.vapi.nat_det_map_dump()
4308 self.assertEqual(dms[0].ses_num, 0)
4310 def test_tcp_session_close_detection_in(self):
4311 """ Deterministic NAT TCP session close from inside network """
4312 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4314 socket.inet_aton(self.nat_addr),
4316 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4317 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4320 self.initiate_tcp_session(self.pg0, self.pg1)
4322 # close the session from inside
4324 # FIN packet in -> out
4325 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4326 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4327 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4329 self.pg0.add_stream(p)
4330 self.pg_enable_capture(self.pg_interfaces)
4332 self.pg1.get_capture(1)
4336 # ACK packet out -> in
4337 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4338 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4339 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4343 # FIN packet out -> in
4344 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4345 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4346 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4350 self.pg1.add_stream(pkts)
4351 self.pg_enable_capture(self.pg_interfaces)
4353 self.pg0.get_capture(2)
4355 # ACK packet in -> out
4356 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4357 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4358 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4360 self.pg0.add_stream(p)
4361 self.pg_enable_capture(self.pg_interfaces)
4363 self.pg1.get_capture(1)
4365 # Check if deterministic NAT44 closed the session
4366 dms = self.vapi.nat_det_map_dump()
4367 self.assertEqual(0, dms[0].ses_num)
4369 self.logger.error("TCP session termination failed")
4372 def test_tcp_session_close_detection_out(self):
4373 """ Deterministic NAT TCP session close from outside network """
4374 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4376 socket.inet_aton(self.nat_addr),
4378 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4379 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4382 self.initiate_tcp_session(self.pg0, self.pg1)
4384 # close the session from outside
4386 # FIN packet out -> in
4387 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4388 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4389 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4391 self.pg1.add_stream(p)
4392 self.pg_enable_capture(self.pg_interfaces)
4394 self.pg0.get_capture(1)
4398 # ACK packet in -> out
4399 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4400 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4401 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4405 # ACK packet in -> out
4406 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4407 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4408 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4412 self.pg0.add_stream(pkts)
4413 self.pg_enable_capture(self.pg_interfaces)
4415 self.pg1.get_capture(2)
4417 # ACK packet out -> in
4418 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4419 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4420 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4422 self.pg1.add_stream(p)
4423 self.pg_enable_capture(self.pg_interfaces)
4425 self.pg0.get_capture(1)
4427 # Check if deterministic NAT44 closed the session
4428 dms = self.vapi.nat_det_map_dump()
4429 self.assertEqual(0, dms[0].ses_num)
4431 self.logger.error("TCP session termination failed")
4434 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4435 def test_session_timeout(self):
4436 """ Deterministic NAT session timeouts """
4437 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4439 socket.inet_aton(self.nat_addr),
4441 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4442 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4445 self.initiate_tcp_session(self.pg0, self.pg1)
4446 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4447 pkts = self.create_stream_in(self.pg0, self.pg1)
4448 self.pg0.add_stream(pkts)
4449 self.pg_enable_capture(self.pg_interfaces)
4451 capture = self.pg1.get_capture(len(pkts))
4454 dms = self.vapi.nat_det_map_dump()
4455 self.assertEqual(0, dms[0].ses_num)
4457 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4458 def test_session_limit_per_user(self):
4459 """ Deterministic NAT maximum sessions per user limit """
4460 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4462 socket.inet_aton(self.nat_addr),
4464 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4465 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4467 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4468 src_address=self.pg2.local_ip4n,
4470 template_interval=10)
4471 self.vapi.nat_ipfix()
4474 for port in range(1025, 2025):
4475 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4476 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4477 UDP(sport=port, dport=port))
4480 self.pg0.add_stream(pkts)
4481 self.pg_enable_capture(self.pg_interfaces)
4483 capture = self.pg1.get_capture(len(pkts))
4485 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4486 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4487 UDP(sport=3001, dport=3002))
4488 self.pg0.add_stream(p)
4489 self.pg_enable_capture(self.pg_interfaces)
4491 capture = self.pg1.assert_nothing_captured()
4493 # verify ICMP error packet
4494 capture = self.pg0.get_capture(1)
4496 self.assertTrue(p.haslayer(ICMP))
4498 self.assertEqual(icmp.type, 3)
4499 self.assertEqual(icmp.code, 1)
4500 self.assertTrue(icmp.haslayer(IPerror))
4501 inner_ip = icmp[IPerror]
4502 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4503 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4505 dms = self.vapi.nat_det_map_dump()
4507 self.assertEqual(1000, dms[0].ses_num)
4509 # verify IPFIX logging
4510 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4512 capture = self.pg2.get_capture(2)
4513 ipfix = IPFIXDecoder()
4514 # first load template
4516 self.assertTrue(p.haslayer(IPFIX))
4517 if p.haslayer(Template):
4518 ipfix.add_template(p.getlayer(Template))
4519 # verify events in data set
4521 if p.haslayer(Data):
4522 data = ipfix.decode_data_set(p.getlayer(Set))
4523 self.verify_ipfix_max_entries_per_user(data)
4525 def clear_nat_det(self):
4527 Clear deterministic NAT configuration.
4529 self.vapi.nat_ipfix(enable=0)
4530 self.vapi.nat_det_set_timeouts()
4531 deterministic_mappings = self.vapi.nat_det_map_dump()
4532 for dsm in deterministic_mappings:
4533 self.vapi.nat_det_add_del_map(dsm.in_addr,
4539 interfaces = self.vapi.nat44_interface_dump()
4540 for intf in interfaces:
4541 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4546 super(TestDeterministicNAT, self).tearDown()
4547 if not self.vpp_dead:
4548 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4550 self.vapi.cli("show nat44 deterministic mappings"))
4552 self.vapi.cli("show nat44 deterministic timeouts"))
4554 self.vapi.cli("show nat44 deterministic sessions"))
4555 self.clear_nat_det()
4558 class TestNAT64(MethodHolder):
4559 """ NAT64 Test Cases """
4562 def setUpConstants(cls):
4563 super(TestNAT64, cls).setUpConstants()
4564 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4565 "nat64 st hash buckets 256", "}"])
4568 def setUpClass(cls):
4569 super(TestNAT64, cls).setUpClass()
4572 cls.tcp_port_in = 6303
4573 cls.tcp_port_out = 6303
4574 cls.udp_port_in = 6304
4575 cls.udp_port_out = 6304
4576 cls.icmp_id_in = 6305
4577 cls.icmp_id_out = 6305
4578 cls.nat_addr = '10.0.0.3'
4579 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4581 cls.vrf1_nat_addr = '10.0.10.3'
4582 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4584 cls.ipfix_src_port = 4739
4585 cls.ipfix_domain_id = 1
4587 cls.create_pg_interfaces(range(5))
4588 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4589 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4590 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4592 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4594 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4596 cls.pg0.generate_remote_hosts(2)
4598 for i in cls.ip6_interfaces:
4601 i.configure_ipv6_neighbors()
4603 for i in cls.ip4_interfaces:
4609 cls.pg3.config_ip4()
4610 cls.pg3.resolve_arp()
4611 cls.pg3.config_ip6()
4612 cls.pg3.configure_ipv6_neighbors()
4615 super(TestNAT64, cls).tearDownClass()
4618 def test_pool(self):
4619 """ Add/delete address to NAT64 pool """
4620 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4622 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4624 addresses = self.vapi.nat64_pool_addr_dump()
4625 self.assertEqual(len(addresses), 1)
4626 self.assertEqual(addresses[0].address, nat_addr)
4628 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4630 addresses = self.vapi.nat64_pool_addr_dump()
4631 self.assertEqual(len(addresses), 0)
4633 def test_interface(self):
4634 """ Enable/disable NAT64 feature on the interface """
4635 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4636 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4638 interfaces = self.vapi.nat64_interface_dump()
4639 self.assertEqual(len(interfaces), 2)
4642 for intf in interfaces:
4643 if intf.sw_if_index == self.pg0.sw_if_index:
4644 self.assertEqual(intf.is_inside, 1)
4646 elif intf.sw_if_index == self.pg1.sw_if_index:
4647 self.assertEqual(intf.is_inside, 0)
4649 self.assertTrue(pg0_found)
4650 self.assertTrue(pg1_found)
4652 features = self.vapi.cli("show interface features pg0")
4653 self.assertNotEqual(features.find('nat64-in2out'), -1)
4654 features = self.vapi.cli("show interface features pg1")
4655 self.assertNotEqual(features.find('nat64-out2in'), -1)
4657 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4658 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4660 interfaces = self.vapi.nat64_interface_dump()
4661 self.assertEqual(len(interfaces), 0)
4663 def test_static_bib(self):
4664 """ Add/delete static BIB entry """
4665 in_addr = socket.inet_pton(socket.AF_INET6,
4666 '2001:db8:85a3::8a2e:370:7334')
4667 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4670 proto = IP_PROTOS.tcp
4672 self.vapi.nat64_add_del_static_bib(in_addr,
4677 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4682 self.assertEqual(bibe.i_addr, in_addr)
4683 self.assertEqual(bibe.o_addr, out_addr)
4684 self.assertEqual(bibe.i_port, in_port)
4685 self.assertEqual(bibe.o_port, out_port)
4686 self.assertEqual(static_bib_num, 1)
4688 self.vapi.nat64_add_del_static_bib(in_addr,
4694 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4699 self.assertEqual(static_bib_num, 0)
4701 def test_set_timeouts(self):
4702 """ Set NAT64 timeouts """
4703 # verify default values
4704 timeouts = self.vapi.nat64_get_timeouts()
4705 self.assertEqual(timeouts.udp, 300)
4706 self.assertEqual(timeouts.icmp, 60)
4707 self.assertEqual(timeouts.tcp_trans, 240)
4708 self.assertEqual(timeouts.tcp_est, 7440)
4709 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4711 # set and verify custom values
4712 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4713 tcp_est=7450, tcp_incoming_syn=10)
4714 timeouts = self.vapi.nat64_get_timeouts()
4715 self.assertEqual(timeouts.udp, 200)
4716 self.assertEqual(timeouts.icmp, 30)
4717 self.assertEqual(timeouts.tcp_trans, 250)
4718 self.assertEqual(timeouts.tcp_est, 7450)
4719 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4721 def test_dynamic(self):
4722 """ NAT64 dynamic translation test """
4723 self.tcp_port_in = 6303
4724 self.udp_port_in = 6304
4725 self.icmp_id_in = 6305
4727 ses_num_start = self.nat64_get_ses_num()
4729 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4731 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4732 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4735 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4736 self.pg0.add_stream(pkts)
4737 self.pg_enable_capture(self.pg_interfaces)
4739 capture = self.pg1.get_capture(len(pkts))
4740 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4741 dst_ip=self.pg1.remote_ip4)
4744 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4745 self.pg1.add_stream(pkts)
4746 self.pg_enable_capture(self.pg_interfaces)
4748 capture = self.pg0.get_capture(len(pkts))
4749 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4750 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4753 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4754 self.pg0.add_stream(pkts)
4755 self.pg_enable_capture(self.pg_interfaces)
4757 capture = self.pg1.get_capture(len(pkts))
4758 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4759 dst_ip=self.pg1.remote_ip4)
4762 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4763 self.pg1.add_stream(pkts)
4764 self.pg_enable_capture(self.pg_interfaces)
4766 capture = self.pg0.get_capture(len(pkts))
4767 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4769 ses_num_end = self.nat64_get_ses_num()
4771 self.assertEqual(ses_num_end - ses_num_start, 3)
4773 # tenant with specific VRF
4774 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4775 self.vrf1_nat_addr_n,
4776 vrf_id=self.vrf1_id)
4777 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4779 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4780 self.pg2.add_stream(pkts)
4781 self.pg_enable_capture(self.pg_interfaces)
4783 capture = self.pg1.get_capture(len(pkts))
4784 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4785 dst_ip=self.pg1.remote_ip4)
4787 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4788 self.pg1.add_stream(pkts)
4789 self.pg_enable_capture(self.pg_interfaces)
4791 capture = self.pg2.get_capture(len(pkts))
4792 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4794 def test_static(self):
4795 """ NAT64 static translation test """
4796 self.tcp_port_in = 60303
4797 self.udp_port_in = 60304
4798 self.icmp_id_in = 60305
4799 self.tcp_port_out = 60303
4800 self.udp_port_out = 60304
4801 self.icmp_id_out = 60305
4803 ses_num_start = self.nat64_get_ses_num()
4805 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4807 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4808 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4810 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4815 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4820 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4827 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4828 self.pg0.add_stream(pkts)
4829 self.pg_enable_capture(self.pg_interfaces)
4831 capture = self.pg1.get_capture(len(pkts))
4832 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4833 dst_ip=self.pg1.remote_ip4, same_port=True)
4836 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4837 self.pg1.add_stream(pkts)
4838 self.pg_enable_capture(self.pg_interfaces)
4840 capture = self.pg0.get_capture(len(pkts))
4841 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4842 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4844 ses_num_end = self.nat64_get_ses_num()
4846 self.assertEqual(ses_num_end - ses_num_start, 3)
4848 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4849 def test_session_timeout(self):
4850 """ NAT64 session timeout """
4851 self.icmp_id_in = 1234
4852 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4854 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4855 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4856 self.vapi.nat64_set_timeouts(icmp=5)
4858 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4859 self.pg0.add_stream(pkts)
4860 self.pg_enable_capture(self.pg_interfaces)
4862 capture = self.pg1.get_capture(len(pkts))
4864 ses_num_before_timeout = self.nat64_get_ses_num()
4868 # ICMP session after timeout
4869 ses_num_after_timeout = self.nat64_get_ses_num()
4870 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4872 def test_icmp_error(self):
4873 """ NAT64 ICMP Error message translation """
4874 self.tcp_port_in = 6303
4875 self.udp_port_in = 6304
4876 self.icmp_id_in = 6305
4878 ses_num_start = self.nat64_get_ses_num()
4880 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4882 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4883 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4885 # send some packets to create sessions
4886 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4887 self.pg0.add_stream(pkts)
4888 self.pg_enable_capture(self.pg_interfaces)
4890 capture_ip4 = self.pg1.get_capture(len(pkts))
4891 self.verify_capture_out(capture_ip4,
4892 nat_ip=self.nat_addr,
4893 dst_ip=self.pg1.remote_ip4)
4895 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4896 self.pg1.add_stream(pkts)
4897 self.pg_enable_capture(self.pg_interfaces)
4899 capture_ip6 = self.pg0.get_capture(len(pkts))
4900 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4901 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4902 self.pg0.remote_ip6)
4905 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4906 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4907 ICMPv6DestUnreach(code=1) /
4908 packet[IPv6] for packet in capture_ip6]
4909 self.pg0.add_stream(pkts)
4910 self.pg_enable_capture(self.pg_interfaces)
4912 capture = self.pg1.get_capture(len(pkts))
4913 for packet in capture:
4915 self.assertEqual(packet[IP].src, self.nat_addr)
4916 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4917 self.assertEqual(packet[ICMP].type, 3)
4918 self.assertEqual(packet[ICMP].code, 13)
4919 inner = packet[IPerror]
4920 self.assertEqual(inner.src, self.pg1.remote_ip4)
4921 self.assertEqual(inner.dst, self.nat_addr)
4922 self.check_icmp_checksum(packet)
4923 if inner.haslayer(TCPerror):
4924 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4925 elif inner.haslayer(UDPerror):
4926 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4928 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4930 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4934 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4935 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4936 ICMP(type=3, code=13) /
4937 packet[IP] for packet in capture_ip4]
4938 self.pg1.add_stream(pkts)
4939 self.pg_enable_capture(self.pg_interfaces)
4941 capture = self.pg0.get_capture(len(pkts))
4942 for packet in capture:
4944 self.assertEqual(packet[IPv6].src, ip.src)
4945 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4946 icmp = packet[ICMPv6DestUnreach]
4947 self.assertEqual(icmp.code, 1)
4948 inner = icmp[IPerror6]
4949 self.assertEqual(inner.src, self.pg0.remote_ip6)
4950 self.assertEqual(inner.dst, ip.src)
4951 self.check_icmpv6_checksum(packet)
4952 if inner.haslayer(TCPerror):
4953 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4954 elif inner.haslayer(UDPerror):
4955 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4957 self.assertEqual(inner[ICMPv6EchoRequest].id,
4960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4963 def test_hairpinning(self):
4964 """ NAT64 hairpinning """
4966 client = self.pg0.remote_hosts[0]
4967 server = self.pg0.remote_hosts[1]
4968 server_tcp_in_port = 22
4969 server_tcp_out_port = 4022
4970 server_udp_in_port = 23
4971 server_udp_out_port = 4023
4972 client_tcp_in_port = 1234
4973 client_udp_in_port = 1235
4974 client_tcp_out_port = 0
4975 client_udp_out_port = 0
4976 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4977 nat_addr_ip6 = ip.src
4979 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4981 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4982 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4984 self.vapi.nat64_add_del_static_bib(server.ip6n,
4987 server_tcp_out_port,
4989 self.vapi.nat64_add_del_static_bib(server.ip6n,
4992 server_udp_out_port,
4997 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4998 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4999 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5001 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5002 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5003 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5005 self.pg0.add_stream(pkts)
5006 self.pg_enable_capture(self.pg_interfaces)
5008 capture = self.pg0.get_capture(len(pkts))
5009 for packet in capture:
5011 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5012 self.assertEqual(packet[IPv6].dst, server.ip6)
5013 if packet.haslayer(TCP):
5014 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5015 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5016 self.check_tcp_checksum(packet)
5017 client_tcp_out_port = packet[TCP].sport
5019 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5020 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5021 self.check_udp_checksum(packet)
5022 client_udp_out_port = packet[UDP].sport
5024 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5029 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5030 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5031 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5033 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5034 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5035 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5037 self.pg0.add_stream(pkts)
5038 self.pg_enable_capture(self.pg_interfaces)
5040 capture = self.pg0.get_capture(len(pkts))
5041 for packet in capture:
5043 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5044 self.assertEqual(packet[IPv6].dst, client.ip6)
5045 if packet.haslayer(TCP):
5046 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5047 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5048 self.check_tcp_checksum(packet)
5050 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5051 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5052 self.check_udp_checksum(packet)
5054 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5059 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5060 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5061 ICMPv6DestUnreach(code=1) /
5062 packet[IPv6] for packet in capture]
5063 self.pg0.add_stream(pkts)
5064 self.pg_enable_capture(self.pg_interfaces)
5066 capture = self.pg0.get_capture(len(pkts))
5067 for packet in capture:
5069 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5070 self.assertEqual(packet[IPv6].dst, server.ip6)
5071 icmp = packet[ICMPv6DestUnreach]
5072 self.assertEqual(icmp.code, 1)
5073 inner = icmp[IPerror6]
5074 self.assertEqual(inner.src, server.ip6)
5075 self.assertEqual(inner.dst, nat_addr_ip6)
5076 self.check_icmpv6_checksum(packet)
5077 if inner.haslayer(TCPerror):
5078 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5079 self.assertEqual(inner[TCPerror].dport,
5080 client_tcp_out_port)
5082 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5083 self.assertEqual(inner[UDPerror].dport,
5084 client_udp_out_port)
5086 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5089 def test_prefix(self):
5090 """ NAT64 Network-Specific Prefix """
5092 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5094 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5095 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5096 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5097 self.vrf1_nat_addr_n,
5098 vrf_id=self.vrf1_id)
5099 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5102 global_pref64 = "2001:db8::"
5103 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5104 global_pref64_len = 32
5105 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5107 prefix = self.vapi.nat64_prefix_dump()
5108 self.assertEqual(len(prefix), 1)
5109 self.assertEqual(prefix[0].prefix, global_pref64_n)
5110 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5111 self.assertEqual(prefix[0].vrf_id, 0)
5113 # Add tenant specific prefix
5114 vrf1_pref64 = "2001:db8:122:300::"
5115 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5116 vrf1_pref64_len = 56
5117 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5119 vrf_id=self.vrf1_id)
5120 prefix = self.vapi.nat64_prefix_dump()
5121 self.assertEqual(len(prefix), 2)
5124 pkts = self.create_stream_in_ip6(self.pg0,
5127 plen=global_pref64_len)
5128 self.pg0.add_stream(pkts)
5129 self.pg_enable_capture(self.pg_interfaces)
5131 capture = self.pg1.get_capture(len(pkts))
5132 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5133 dst_ip=self.pg1.remote_ip4)
5135 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5136 self.pg1.add_stream(pkts)
5137 self.pg_enable_capture(self.pg_interfaces)
5139 capture = self.pg0.get_capture(len(pkts))
5140 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5143 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5145 # Tenant specific prefix
5146 pkts = self.create_stream_in_ip6(self.pg2,
5149 plen=vrf1_pref64_len)
5150 self.pg2.add_stream(pkts)
5151 self.pg_enable_capture(self.pg_interfaces)
5153 capture = self.pg1.get_capture(len(pkts))
5154 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5155 dst_ip=self.pg1.remote_ip4)
5157 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5158 self.pg1.add_stream(pkts)
5159 self.pg_enable_capture(self.pg_interfaces)
5161 capture = self.pg2.get_capture(len(pkts))
5162 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5165 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5167 def test_unknown_proto(self):
5168 """ NAT64 translate packet with unknown protocol """
5170 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5172 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5173 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5174 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5177 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5178 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5179 TCP(sport=self.tcp_port_in, dport=20))
5180 self.pg0.add_stream(p)
5181 self.pg_enable_capture(self.pg_interfaces)
5183 p = self.pg1.get_capture(1)
5185 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5186 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5188 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5189 TCP(sport=1234, dport=1234))
5190 self.pg0.add_stream(p)
5191 self.pg_enable_capture(self.pg_interfaces)
5193 p = self.pg1.get_capture(1)
5196 self.assertEqual(packet[IP].src, self.nat_addr)
5197 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5198 self.assertTrue(packet.haslayer(GRE))
5199 self.check_ip_checksum(packet)
5201 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5206 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5208 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5209 TCP(sport=1234, dport=1234))
5210 self.pg1.add_stream(p)
5211 self.pg_enable_capture(self.pg_interfaces)
5213 p = self.pg0.get_capture(1)
5216 self.assertEqual(packet[IPv6].src, remote_ip6)
5217 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5218 self.assertEqual(packet[IPv6].nh, 47)
5220 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5223 def test_hairpinning_unknown_proto(self):
5224 """ NAT64 translate packet with unknown protocol - hairpinning """
5226 client = self.pg0.remote_hosts[0]
5227 server = self.pg0.remote_hosts[1]
5228 server_tcp_in_port = 22
5229 server_tcp_out_port = 4022
5230 client_tcp_in_port = 1234
5231 client_tcp_out_port = 1235
5232 server_nat_ip = "10.0.0.100"
5233 client_nat_ip = "10.0.0.110"
5234 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5235 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5236 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5237 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5239 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5241 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5242 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5244 self.vapi.nat64_add_del_static_bib(server.ip6n,
5247 server_tcp_out_port,
5250 self.vapi.nat64_add_del_static_bib(server.ip6n,
5256 self.vapi.nat64_add_del_static_bib(client.ip6n,
5259 client_tcp_out_port,
5263 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5264 IPv6(src=client.ip6, dst=server_nat_ip6) /
5265 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5266 self.pg0.add_stream(p)
5267 self.pg_enable_capture(self.pg_interfaces)
5269 p = self.pg0.get_capture(1)
5271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5272 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5274 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5275 TCP(sport=1234, dport=1234))
5276 self.pg0.add_stream(p)
5277 self.pg_enable_capture(self.pg_interfaces)
5279 p = self.pg0.get_capture(1)
5282 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5283 self.assertEqual(packet[IPv6].dst, server.ip6)
5284 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5286 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5290 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5291 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5293 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5294 TCP(sport=1234, dport=1234))
5295 self.pg0.add_stream(p)
5296 self.pg_enable_capture(self.pg_interfaces)
5298 p = self.pg0.get_capture(1)
5301 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5302 self.assertEqual(packet[IPv6].dst, client.ip6)
5303 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5305 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5308 def test_one_armed_nat64(self):
5309 """ One armed NAT64 """
5311 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5315 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5317 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5318 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5321 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5322 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5323 TCP(sport=12345, dport=80))
5324 self.pg3.add_stream(p)
5325 self.pg_enable_capture(self.pg_interfaces)
5327 capture = self.pg3.get_capture(1)
5332 self.assertEqual(ip.src, self.nat_addr)
5333 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5334 self.assertNotEqual(tcp.sport, 12345)
5335 external_port = tcp.sport
5336 self.assertEqual(tcp.dport, 80)
5337 self.check_tcp_checksum(p)
5338 self.check_ip_checksum(p)
5340 self.logger.error(ppp("Unexpected or invalid packet:", p))
5344 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5345 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5346 TCP(sport=80, dport=external_port))
5347 self.pg3.add_stream(p)
5348 self.pg_enable_capture(self.pg_interfaces)
5350 capture = self.pg3.get_capture(1)
5355 self.assertEqual(ip.src, remote_host_ip6)
5356 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5357 self.assertEqual(tcp.sport, 80)
5358 self.assertEqual(tcp.dport, 12345)
5359 self.check_tcp_checksum(p)
5361 self.logger.error(ppp("Unexpected or invalid packet:", p))
5364 def test_frag_in_order(self):
5365 """ NAT64 translate fragments arriving in order """
5366 self.tcp_port_in = random.randint(1025, 65535)
5368 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5370 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5371 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5373 reass = self.vapi.nat_reass_dump()
5374 reass_n_start = len(reass)
5378 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5379 self.tcp_port_in, 20, data)
5380 self.pg0.add_stream(pkts)
5381 self.pg_enable_capture(self.pg_interfaces)
5383 frags = self.pg1.get_capture(len(pkts))
5384 p = self.reass_frags_and_verify(frags,
5386 self.pg1.remote_ip4)
5387 self.assertEqual(p[TCP].dport, 20)
5388 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5389 self.tcp_port_out = p[TCP].sport
5390 self.assertEqual(data, p[Raw].load)
5393 data = "A" * 4 + "b" * 16 + "C" * 3
5394 pkts = self.create_stream_frag(self.pg1,
5399 self.pg1.add_stream(pkts)
5400 self.pg_enable_capture(self.pg_interfaces)
5402 frags = self.pg0.get_capture(len(pkts))
5403 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5404 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5405 self.assertEqual(p[TCP].sport, 20)
5406 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5407 self.assertEqual(data, p[Raw].load)
5409 reass = self.vapi.nat_reass_dump()
5410 reass_n_end = len(reass)
5412 self.assertEqual(reass_n_end - reass_n_start, 2)
5414 def test_reass_hairpinning(self):
5415 """ NAT64 fragments hairpinning """
5417 client = self.pg0.remote_hosts[0]
5418 server = self.pg0.remote_hosts[1]
5419 server_in_port = random.randint(1025, 65535)
5420 server_out_port = random.randint(1025, 65535)
5421 client_in_port = random.randint(1025, 65535)
5422 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5423 nat_addr_ip6 = ip.src
5425 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5427 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5428 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5430 # add static BIB entry for server
5431 self.vapi.nat64_add_del_static_bib(server.ip6n,
5437 # send packet from host to server
5438 pkts = self.create_stream_frag_ip6(self.pg0,
5443 self.pg0.add_stream(pkts)
5444 self.pg_enable_capture(self.pg_interfaces)
5446 frags = self.pg0.get_capture(len(pkts))
5447 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5448 self.assertNotEqual(p[TCP].sport, client_in_port)
5449 self.assertEqual(p[TCP].dport, server_in_port)
5450 self.assertEqual(data, p[Raw].load)
5452 def test_frag_out_of_order(self):
5453 """ NAT64 translate fragments arriving out of order """
5454 self.tcp_port_in = random.randint(1025, 65535)
5456 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5458 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5459 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5463 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5464 self.tcp_port_in, 20, data)
5466 self.pg0.add_stream(pkts)
5467 self.pg_enable_capture(self.pg_interfaces)
5469 frags = self.pg1.get_capture(len(pkts))
5470 p = self.reass_frags_and_verify(frags,
5472 self.pg1.remote_ip4)
5473 self.assertEqual(p[TCP].dport, 20)
5474 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5475 self.tcp_port_out = p[TCP].sport
5476 self.assertEqual(data, p[Raw].load)
5479 data = "A" * 4 + "B" * 16 + "C" * 3
5480 pkts = self.create_stream_frag(self.pg1,
5486 self.pg1.add_stream(pkts)
5487 self.pg_enable_capture(self.pg_interfaces)
5489 frags = self.pg0.get_capture(len(pkts))
5490 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5491 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5492 self.assertEqual(p[TCP].sport, 20)
5493 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5494 self.assertEqual(data, p[Raw].load)
5496 def test_interface_addr(self):
5497 """ Acquire NAT64 pool addresses from interface """
5498 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5500 # no address in NAT64 pool
5501 adresses = self.vapi.nat44_address_dump()
5502 self.assertEqual(0, len(adresses))
5504 # configure interface address and check NAT64 address pool
5505 self.pg4.config_ip4()
5506 addresses = self.vapi.nat64_pool_addr_dump()
5507 self.assertEqual(len(addresses), 1)
5508 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5510 # remove interface address and check NAT64 address pool
5511 self.pg4.unconfig_ip4()
5512 addresses = self.vapi.nat64_pool_addr_dump()
5513 self.assertEqual(0, len(adresses))
5515 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5516 def test_ipfix_max_bibs_sessions(self):
5517 """ IPFIX logging maximum session and BIB entries exceeded """
5520 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5524 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5526 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5527 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5531 for i in range(0, max_bibs):
5532 src = "fd01:aa::%x" % (i)
5533 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5534 IPv6(src=src, dst=remote_host_ip6) /
5535 TCP(sport=12345, dport=80))
5537 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5538 IPv6(src=src, dst=remote_host_ip6) /
5539 TCP(sport=12345, dport=22))
5541 self.pg0.add_stream(pkts)
5542 self.pg_enable_capture(self.pg_interfaces)
5544 self.pg1.get_capture(max_sessions)
5546 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5547 src_address=self.pg3.local_ip4n,
5549 template_interval=10)
5550 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5551 src_port=self.ipfix_src_port)
5553 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5554 IPv6(src=src, dst=remote_host_ip6) /
5555 TCP(sport=12345, dport=25))
5556 self.pg0.add_stream(p)
5557 self.pg_enable_capture(self.pg_interfaces)
5559 self.pg1.get_capture(0)
5560 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5561 capture = self.pg3.get_capture(9)
5562 ipfix = IPFIXDecoder()
5563 # first load template
5565 self.assertTrue(p.haslayer(IPFIX))
5566 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5567 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5568 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5569 self.assertEqual(p[UDP].dport, 4739)
5570 self.assertEqual(p[IPFIX].observationDomainID,
5571 self.ipfix_domain_id)
5572 if p.haslayer(Template):
5573 ipfix.add_template(p.getlayer(Template))
5574 # verify events in data set
5576 if p.haslayer(Data):
5577 data = ipfix.decode_data_set(p.getlayer(Set))
5578 self.verify_ipfix_max_sessions(data, max_sessions)
5580 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5581 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5582 TCP(sport=12345, dport=80))
5583 self.pg0.add_stream(p)
5584 self.pg_enable_capture(self.pg_interfaces)
5586 self.pg1.get_capture(0)
5587 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5588 capture = self.pg3.get_capture(1)
5589 # verify events in data set
5591 self.assertTrue(p.haslayer(IPFIX))
5592 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5593 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5594 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5595 self.assertEqual(p[UDP].dport, 4739)
5596 self.assertEqual(p[IPFIX].observationDomainID,
5597 self.ipfix_domain_id)
5598 if p.haslayer(Data):
5599 data = ipfix.decode_data_set(p.getlayer(Set))
5600 self.verify_ipfix_max_bibs(data, max_bibs)
5602 def test_ipfix_max_frags(self):
5603 """ IPFIX logging maximum fragments pending reassembly exceeded """
5604 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5606 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5607 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5608 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5609 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5610 src_address=self.pg3.local_ip4n,
5612 template_interval=10)
5613 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5614 src_port=self.ipfix_src_port)
5617 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5618 self.tcp_port_in, 20, data)
5619 self.pg0.add_stream(pkts[-1])
5620 self.pg_enable_capture(self.pg_interfaces)
5622 self.pg1.get_capture(0)
5623 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5624 capture = self.pg3.get_capture(9)
5625 ipfix = IPFIXDecoder()
5626 # first load template
5628 self.assertTrue(p.haslayer(IPFIX))
5629 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5630 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5631 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5632 self.assertEqual(p[UDP].dport, 4739)
5633 self.assertEqual(p[IPFIX].observationDomainID,
5634 self.ipfix_domain_id)
5635 if p.haslayer(Template):
5636 ipfix.add_template(p.getlayer(Template))
5637 # verify events in data set
5639 if p.haslayer(Data):
5640 data = ipfix.decode_data_set(p.getlayer(Set))
5641 self.verify_ipfix_max_fragments_ip6(data, 0,
5642 self.pg0.remote_ip6n)
5644 def test_ipfix_bib_ses(self):
5645 """ IPFIX logging NAT64 BIB/session create and delete events """
5646 self.tcp_port_in = random.randint(1025, 65535)
5647 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5651 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5653 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5654 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5655 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5656 src_address=self.pg3.local_ip4n,
5658 template_interval=10)
5659 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5660 src_port=self.ipfix_src_port)
5663 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5664 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5665 TCP(sport=self.tcp_port_in, dport=25))
5666 self.pg0.add_stream(p)
5667 self.pg_enable_capture(self.pg_interfaces)
5669 p = self.pg1.get_capture(1)
5670 self.tcp_port_out = p[0][TCP].sport
5671 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5672 capture = self.pg3.get_capture(10)
5673 ipfix = IPFIXDecoder()
5674 # first load template
5676 self.assertTrue(p.haslayer(IPFIX))
5677 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5678 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5679 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5680 self.assertEqual(p[UDP].dport, 4739)
5681 self.assertEqual(p[IPFIX].observationDomainID,
5682 self.ipfix_domain_id)
5683 if p.haslayer(Template):
5684 ipfix.add_template(p.getlayer(Template))
5685 # verify events in data set
5687 if p.haslayer(Data):
5688 data = ipfix.decode_data_set(p.getlayer(Set))
5689 if ord(data[0][230]) == 10:
5690 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5691 elif ord(data[0][230]) == 6:
5692 self.verify_ipfix_nat64_ses(data,
5694 self.pg0.remote_ip6n,
5695 self.pg1.remote_ip4,
5698 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5701 self.pg_enable_capture(self.pg_interfaces)
5702 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5705 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5706 capture = self.pg3.get_capture(2)
5707 # verify events in data set
5709 self.assertTrue(p.haslayer(IPFIX))
5710 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5711 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5712 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5713 self.assertEqual(p[UDP].dport, 4739)
5714 self.assertEqual(p[IPFIX].observationDomainID,
5715 self.ipfix_domain_id)
5716 if p.haslayer(Data):
5717 data = ipfix.decode_data_set(p.getlayer(Set))
5718 if ord(data[0][230]) == 11:
5719 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5720 elif ord(data[0][230]) == 7:
5721 self.verify_ipfix_nat64_ses(data,
5723 self.pg0.remote_ip6n,
5724 self.pg1.remote_ip4,
5727 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5729 def nat64_get_ses_num(self):
5731 Return number of active NAT64 sessions.
5733 st = self.vapi.nat64_st_dump()
5736 def clear_nat64(self):
5738 Clear NAT64 configuration.
5740 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5741 domain_id=self.ipfix_domain_id)
5742 self.ipfix_src_port = 4739
5743 self.ipfix_domain_id = 1
5745 self.vapi.nat64_set_timeouts()
5747 interfaces = self.vapi.nat64_interface_dump()
5748 for intf in interfaces:
5749 if intf.is_inside > 1:
5750 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5753 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5757 bib = self.vapi.nat64_bib_dump(255)
5760 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5768 adresses = self.vapi.nat64_pool_addr_dump()
5769 for addr in adresses:
5770 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5775 prefixes = self.vapi.nat64_prefix_dump()
5776 for prefix in prefixes:
5777 self.vapi.nat64_add_del_prefix(prefix.prefix,
5779 vrf_id=prefix.vrf_id,
5783 super(TestNAT64, self).tearDown()
5784 if not self.vpp_dead:
5785 self.logger.info(self.vapi.cli("show nat64 pool"))
5786 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5787 self.logger.info(self.vapi.cli("show nat64 prefix"))
5788 self.logger.info(self.vapi.cli("show nat64 bib all"))
5789 self.logger.info(self.vapi.cli("show nat64 session table all"))
5790 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5794 class TestDSlite(MethodHolder):
5795 """ DS-Lite Test Cases """
5798 def setUpClass(cls):
5799 super(TestDSlite, cls).setUpClass()
5802 cls.nat_addr = '10.0.0.3'
5803 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5805 cls.create_pg_interfaces(range(2))
5807 cls.pg0.config_ip4()
5808 cls.pg0.resolve_arp()
5810 cls.pg1.config_ip6()
5811 cls.pg1.generate_remote_hosts(2)
5812 cls.pg1.configure_ipv6_neighbors()
5815 super(TestDSlite, cls).tearDownClass()
5818 def test_dslite(self):
5819 """ Test DS-Lite """
5820 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5822 aftr_ip4 = '192.0.0.1'
5823 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5824 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5825 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5826 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5829 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5830 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5831 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5832 UDP(sport=20000, dport=10000))
5833 self.pg1.add_stream(p)
5834 self.pg_enable_capture(self.pg_interfaces)
5836 capture = self.pg0.get_capture(1)
5837 capture = capture[0]
5838 self.assertFalse(capture.haslayer(IPv6))
5839 self.assertEqual(capture[IP].src, self.nat_addr)
5840 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5841 self.assertNotEqual(capture[UDP].sport, 20000)
5842 self.assertEqual(capture[UDP].dport, 10000)
5843 self.check_ip_checksum(capture)
5844 out_port = capture[UDP].sport
5846 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5847 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5848 UDP(sport=10000, dport=out_port))
5849 self.pg0.add_stream(p)
5850 self.pg_enable_capture(self.pg_interfaces)
5852 capture = self.pg1.get_capture(1)
5853 capture = capture[0]
5854 self.assertEqual(capture[IPv6].src, aftr_ip6)
5855 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5856 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5857 self.assertEqual(capture[IP].dst, '192.168.1.1')
5858 self.assertEqual(capture[UDP].sport, 10000)
5859 self.assertEqual(capture[UDP].dport, 20000)
5860 self.check_ip_checksum(capture)
5863 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5864 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5865 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5866 TCP(sport=20001, dport=10001))
5867 self.pg1.add_stream(p)
5868 self.pg_enable_capture(self.pg_interfaces)
5870 capture = self.pg0.get_capture(1)
5871 capture = capture[0]
5872 self.assertFalse(capture.haslayer(IPv6))
5873 self.assertEqual(capture[IP].src, self.nat_addr)
5874 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5875 self.assertNotEqual(capture[TCP].sport, 20001)
5876 self.assertEqual(capture[TCP].dport, 10001)
5877 self.check_ip_checksum(capture)
5878 self.check_tcp_checksum(capture)
5879 out_port = capture[TCP].sport
5881 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5882 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5883 TCP(sport=10001, dport=out_port))
5884 self.pg0.add_stream(p)
5885 self.pg_enable_capture(self.pg_interfaces)
5887 capture = self.pg1.get_capture(1)
5888 capture = capture[0]
5889 self.assertEqual(capture[IPv6].src, aftr_ip6)
5890 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5891 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5892 self.assertEqual(capture[IP].dst, '192.168.1.1')
5893 self.assertEqual(capture[TCP].sport, 10001)
5894 self.assertEqual(capture[TCP].dport, 20001)
5895 self.check_ip_checksum(capture)
5896 self.check_tcp_checksum(capture)
5899 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5900 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5901 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5902 ICMP(id=4000, type='echo-request'))
5903 self.pg1.add_stream(p)
5904 self.pg_enable_capture(self.pg_interfaces)
5906 capture = self.pg0.get_capture(1)
5907 capture = capture[0]
5908 self.assertFalse(capture.haslayer(IPv6))
5909 self.assertEqual(capture[IP].src, self.nat_addr)
5910 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5911 self.assertNotEqual(capture[ICMP].id, 4000)
5912 self.check_ip_checksum(capture)
5913 self.check_icmp_checksum(capture)
5914 out_id = capture[ICMP].id
5916 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5917 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5918 ICMP(id=out_id, type='echo-reply'))
5919 self.pg0.add_stream(p)
5920 self.pg_enable_capture(self.pg_interfaces)
5922 capture = self.pg1.get_capture(1)
5923 capture = capture[0]
5924 self.assertEqual(capture[IPv6].src, aftr_ip6)
5925 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5926 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5927 self.assertEqual(capture[IP].dst, '192.168.1.1')
5928 self.assertEqual(capture[ICMP].id, 4000)
5929 self.check_ip_checksum(capture)
5930 self.check_icmp_checksum(capture)
5932 # ping DS-Lite AFTR tunnel endpoint address
5933 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5934 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5935 ICMPv6EchoRequest())
5936 self.pg1.add_stream(p)
5937 self.pg_enable_capture(self.pg_interfaces)
5939 capture = self.pg1.get_capture(1)
5940 self.assertEqual(1, len(capture))
5941 capture = capture[0]
5942 self.assertEqual(capture[IPv6].src, aftr_ip6)
5943 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5944 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5947 super(TestDSlite, self).tearDown()
5948 if not self.vpp_dead:
5949 self.logger.info(self.vapi.cli("show dslite pool"))
5951 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5952 self.logger.info(self.vapi.cli("show dslite sessions"))
5955 class TestDSliteCE(MethodHolder):
5956 """ DS-Lite CE Test Cases """
5959 def setUpConstants(cls):
5960 super(TestDSliteCE, cls).setUpConstants()
5961 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5964 def setUpClass(cls):
5965 super(TestDSliteCE, cls).setUpClass()
5968 cls.create_pg_interfaces(range(2))
5970 cls.pg0.config_ip4()
5971 cls.pg0.resolve_arp()
5973 cls.pg1.config_ip6()
5974 cls.pg1.generate_remote_hosts(1)
5975 cls.pg1.configure_ipv6_neighbors()
5978 super(TestDSliteCE, cls).tearDownClass()
5981 def test_dslite_ce(self):
5982 """ Test DS-Lite CE """
5984 b4_ip4 = '192.0.0.2'
5985 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5986 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5987 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5988 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5990 aftr_ip4 = '192.0.0.1'
5991 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5992 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5993 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5994 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5996 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5997 dst_address_length=128,
5998 next_hop_address=self.pg1.remote_ip6n,
5999 next_hop_sw_if_index=self.pg1.sw_if_index,
6003 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6004 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6005 UDP(sport=10000, dport=20000))
6006 self.pg0.add_stream(p)
6007 self.pg_enable_capture(self.pg_interfaces)
6009 capture = self.pg1.get_capture(1)
6010 capture = capture[0]
6011 self.assertEqual(capture[IPv6].src, b4_ip6)
6012 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6013 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6014 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6015 self.assertEqual(capture[UDP].sport, 10000)
6016 self.assertEqual(capture[UDP].dport, 20000)
6017 self.check_ip_checksum(capture)
6020 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6021 IPv6(dst=b4_ip6, src=aftr_ip6) /
6022 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6023 UDP(sport=20000, dport=10000))
6024 self.pg1.add_stream(p)
6025 self.pg_enable_capture(self.pg_interfaces)
6027 capture = self.pg0.get_capture(1)
6028 capture = capture[0]
6029 self.assertFalse(capture.haslayer(IPv6))
6030 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6031 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6032 self.assertEqual(capture[UDP].sport, 20000)
6033 self.assertEqual(capture[UDP].dport, 10000)
6034 self.check_ip_checksum(capture)
6036 # ping DS-Lite B4 tunnel endpoint address
6037 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6038 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6039 ICMPv6EchoRequest())
6040 self.pg1.add_stream(p)
6041 self.pg_enable_capture(self.pg_interfaces)
6043 capture = self.pg1.get_capture(1)
6044 self.assertEqual(1, len(capture))
6045 capture = capture[0]
6046 self.assertEqual(capture[IPv6].src, b4_ip6)
6047 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6048 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6051 super(TestDSliteCE, self).tearDown()
6052 if not self.vpp_dead:
6054 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6056 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6058 if __name__ == '__main__':
6059 unittest.main(testRunner=VppTestRunner)