9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_dynamic_to_static(self):
1680 """ Switch from dynamic translation to 1:1NAT """
1681 nat_ip = "10.0.0.10"
1682 self.tcp_port_out = 6303
1683 self.udp_port_out = 6304
1684 self.icmp_id_out = 6305
1686 self.nat44_add_address(self.nat_addr)
1687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692 pkts = self.create_stream_in(self.pg0, self.pg1)
1693 self.pg0.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg1.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1700 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702 self.assertEqual(len(sessions), 0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_identity_nat(self):
1711 """ Identity NAT """
1713 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1718 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720 TCP(sport=12345, dport=56789))
1721 self.pg1.add_stream(p)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg0.get_capture(1)
1729 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730 self.assertEqual(ip.src, self.pg1.remote_ip4)
1731 self.assertEqual(tcp.dport, 56789)
1732 self.assertEqual(tcp.sport, 12345)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1739 def test_static_lb(self):
1740 """ NAT44 local service load balancing """
1741 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1744 server1 = self.pg0.remote_hosts[0]
1745 server2 = self.pg0.remote_hosts[1]
1747 locals = [{'addr': server1.ip4n,
1750 {'addr': server2.ip4n,
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1758 local_num=len(locals),
1760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1764 # from client to service
1765 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767 TCP(sport=12345, dport=external_port))
1768 self.pg1.add_stream(p)
1769 self.pg_enable_capture(self.pg_interfaces)
1771 capture = self.pg0.get_capture(1)
1777 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778 if ip.dst == server1.ip4:
1782 self.assertEqual(tcp.dport, local_port)
1783 self.check_tcp_checksum(p)
1784 self.check_ip_checksum(p)
1786 self.logger.error(ppp("Unexpected or invalid packet:", p))
1789 # from service back to client
1790 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792 TCP(sport=local_port, dport=12345))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg1.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(tcp.sport, external_port)
1803 self.check_tcp_checksum(p)
1804 self.check_ip_checksum(p)
1806 self.logger.error(ppp("Unexpected or invalid packet:", p))
1812 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1814 for client in clients:
1815 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1816 IP(src=client, dst=self.nat_addr) /
1817 TCP(sport=12345, dport=external_port))
1819 self.pg1.add_stream(pkts)
1820 self.pg_enable_capture(self.pg_interfaces)
1822 capture = self.pg0.get_capture(len(pkts))
1824 if p[IP].dst == server1.ip4:
1828 self.assertTrue(server1_n > server2_n)
1830 def test_static_lb_2(self):
1831 """ NAT44 local service load balancing (asymmetrical rule) """
1832 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1835 server1 = self.pg0.remote_hosts[0]
1836 server2 = self.pg0.remote_hosts[1]
1838 locals = [{'addr': server1.ip4n,
1841 {'addr': server2.ip4n,
1845 self.vapi.nat44_forwarding_enable_disable(1)
1846 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1850 local_num=len(locals),
1852 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1853 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1856 # from client to service
1857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1859 TCP(sport=12345, dport=external_port))
1860 self.pg1.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 capture = self.pg0.get_capture(1)
1869 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1870 if ip.dst == server1.ip4:
1874 self.assertEqual(tcp.dport, local_port)
1875 self.check_tcp_checksum(p)
1876 self.check_ip_checksum(p)
1878 self.logger.error(ppp("Unexpected or invalid packet:", p))
1881 # from service back to client
1882 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1883 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1884 TCP(sport=local_port, dport=12345))
1885 self.pg0.add_stream(p)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg1.get_capture(1)
1893 self.assertEqual(ip.src, self.nat_addr)
1894 self.assertEqual(tcp.sport, external_port)
1895 self.check_tcp_checksum(p)
1896 self.check_ip_checksum(p)
1898 self.logger.error(ppp("Unexpected or invalid packet:", p))
1901 # from client to server (no translation)
1902 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1903 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1904 TCP(sport=12346, dport=local_port))
1905 self.pg1.add_stream(p)
1906 self.pg_enable_capture(self.pg_interfaces)
1908 capture = self.pg0.get_capture(1)
1914 self.assertEqual(ip.dst, server1.ip4)
1915 self.assertEqual(tcp.dport, local_port)
1916 self.check_tcp_checksum(p)
1917 self.check_ip_checksum(p)
1919 self.logger.error(ppp("Unexpected or invalid packet:", p))
1922 # from service back to client (no translation)
1923 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1924 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1925 TCP(sport=local_port, dport=12346))
1926 self.pg0.add_stream(p)
1927 self.pg_enable_capture(self.pg_interfaces)
1929 capture = self.pg1.get_capture(1)
1934 self.assertEqual(ip.src, server1.ip4)
1935 self.assertEqual(tcp.sport, local_port)
1936 self.check_tcp_checksum(p)
1937 self.check_ip_checksum(p)
1939 self.logger.error(ppp("Unexpected or invalid packet:", p))
1942 def test_multiple_inside_interfaces(self):
1943 """ NAT44 multiple non-overlapping address space inside interfaces """
1945 self.nat44_add_address(self.nat_addr)
1946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1947 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1948 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1951 # between two NAT44 inside interfaces (no translation)
1952 pkts = self.create_stream_in(self.pg0, self.pg1)
1953 self.pg0.add_stream(pkts)
1954 self.pg_enable_capture(self.pg_interfaces)
1956 capture = self.pg1.get_capture(len(pkts))
1957 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1959 # from NAT44 inside to interface without NAT44 feature (no translation)
1960 pkts = self.create_stream_in(self.pg0, self.pg2)
1961 self.pg0.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg2.get_capture(len(pkts))
1965 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1967 # in2out 1st interface
1968 pkts = self.create_stream_in(self.pg0, self.pg3)
1969 self.pg0.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg3.get_capture(len(pkts))
1973 self.verify_capture_out(capture)
1975 # out2in 1st interface
1976 pkts = self.create_stream_out(self.pg3)
1977 self.pg3.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 self.verify_capture_in(capture, self.pg0)
1983 # in2out 2nd interface
1984 pkts = self.create_stream_in(self.pg1, self.pg3)
1985 self.pg1.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg3.get_capture(len(pkts))
1989 self.verify_capture_out(capture)
1991 # out2in 2nd interface
1992 pkts = self.create_stream_out(self.pg3)
1993 self.pg3.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg1.get_capture(len(pkts))
1997 self.verify_capture_in(capture, self.pg1)
1999 def test_inside_overlapping_interfaces(self):
2000 """ NAT44 multiple inside interfaces with overlapping address space """
2002 static_nat_ip = "10.0.0.10"
2003 self.nat44_add_address(self.nat_addr)
2004 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2006 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2007 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2008 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2009 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2012 # between NAT44 inside interfaces with same VRF (no translation)
2013 pkts = self.create_stream_in(self.pg4, self.pg5)
2014 self.pg4.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg5.get_capture(len(pkts))
2018 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2020 # between NAT44 inside interfaces with different VRF (hairpinning)
2021 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2022 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2023 TCP(sport=1234, dport=5678))
2024 self.pg4.add_stream(p)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg6.get_capture(1)
2032 self.assertEqual(ip.src, self.nat_addr)
2033 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2034 self.assertNotEqual(tcp.sport, 1234)
2035 self.assertEqual(tcp.dport, 5678)
2037 self.logger.error(ppp("Unexpected or invalid packet:", p))
2040 # in2out 1st interface
2041 pkts = self.create_stream_in(self.pg4, self.pg3)
2042 self.pg4.add_stream(pkts)
2043 self.pg_enable_capture(self.pg_interfaces)
2045 capture = self.pg3.get_capture(len(pkts))
2046 self.verify_capture_out(capture)
2048 # out2in 1st interface
2049 pkts = self.create_stream_out(self.pg3)
2050 self.pg3.add_stream(pkts)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 capture = self.pg4.get_capture(len(pkts))
2054 self.verify_capture_in(capture, self.pg4)
2056 # in2out 2nd interface
2057 pkts = self.create_stream_in(self.pg5, self.pg3)
2058 self.pg5.add_stream(pkts)
2059 self.pg_enable_capture(self.pg_interfaces)
2061 capture = self.pg3.get_capture(len(pkts))
2062 self.verify_capture_out(capture)
2064 # out2in 2nd interface
2065 pkts = self.create_stream_out(self.pg3)
2066 self.pg3.add_stream(pkts)
2067 self.pg_enable_capture(self.pg_interfaces)
2069 capture = self.pg5.get_capture(len(pkts))
2070 self.verify_capture_in(capture, self.pg5)
2073 addresses = self.vapi.nat44_address_dump()
2074 self.assertEqual(len(addresses), 1)
2075 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2076 self.assertEqual(len(sessions), 3)
2077 for session in sessions:
2078 self.assertFalse(session.is_static)
2079 self.assertEqual(session.inside_ip_address[0:4],
2080 self.pg5.remote_ip4n)
2081 self.assertEqual(session.outside_ip_address,
2082 addresses[0].ip_address)
2083 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2084 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2085 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2086 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2087 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2088 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2089 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2090 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2091 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2093 # in2out 3rd interface
2094 pkts = self.create_stream_in(self.pg6, self.pg3)
2095 self.pg6.add_stream(pkts)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 capture = self.pg3.get_capture(len(pkts))
2099 self.verify_capture_out(capture, static_nat_ip, True)
2101 # out2in 3rd interface
2102 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2103 self.pg3.add_stream(pkts)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 capture = self.pg6.get_capture(len(pkts))
2107 self.verify_capture_in(capture, self.pg6)
2109 # general user and session dump verifications
2110 users = self.vapi.nat44_user_dump()
2111 self.assertTrue(len(users) >= 3)
2112 addresses = self.vapi.nat44_address_dump()
2113 self.assertEqual(len(addresses), 1)
2115 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2117 for session in sessions:
2118 self.assertEqual(user.ip_address, session.inside_ip_address)
2119 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2120 self.assertTrue(session.protocol in
2121 [IP_PROTOS.tcp, IP_PROTOS.udp,
2125 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2126 self.assertTrue(len(sessions) >= 4)
2127 for session in sessions:
2128 self.assertFalse(session.is_static)
2129 self.assertEqual(session.inside_ip_address[0:4],
2130 self.pg4.remote_ip4n)
2131 self.assertEqual(session.outside_ip_address,
2132 addresses[0].ip_address)
2135 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2136 self.assertTrue(len(sessions) >= 3)
2137 for session in sessions:
2138 self.assertTrue(session.is_static)
2139 self.assertEqual(session.inside_ip_address[0:4],
2140 self.pg6.remote_ip4n)
2141 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2142 map(int, static_nat_ip.split('.')))
2143 self.assertTrue(session.inside_port in
2144 [self.tcp_port_in, self.udp_port_in,
2147 def test_hairpinning(self):
2148 """ NAT44 hairpinning - 1:1 NAPT """
2150 host = self.pg0.remote_hosts[0]
2151 server = self.pg0.remote_hosts[1]
2154 server_in_port = 5678
2155 server_out_port = 8765
2157 self.nat44_add_address(self.nat_addr)
2158 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2159 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2161 # add static mapping for server
2162 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2163 server_in_port, server_out_port,
2164 proto=IP_PROTOS.tcp)
2166 # send packet from host to server
2167 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2168 IP(src=host.ip4, dst=self.nat_addr) /
2169 TCP(sport=host_in_port, dport=server_out_port))
2170 self.pg0.add_stream(p)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 capture = self.pg0.get_capture(1)
2178 self.assertEqual(ip.src, self.nat_addr)
2179 self.assertEqual(ip.dst, server.ip4)
2180 self.assertNotEqual(tcp.sport, host_in_port)
2181 self.assertEqual(tcp.dport, server_in_port)
2182 self.check_tcp_checksum(p)
2183 host_out_port = tcp.sport
2185 self.logger.error(ppp("Unexpected or invalid packet:", p))
2188 # send reply from server to host
2189 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2190 IP(src=server.ip4, dst=self.nat_addr) /
2191 TCP(sport=server_in_port, dport=host_out_port))
2192 self.pg0.add_stream(p)
2193 self.pg_enable_capture(self.pg_interfaces)
2195 capture = self.pg0.get_capture(1)
2200 self.assertEqual(ip.src, self.nat_addr)
2201 self.assertEqual(ip.dst, host.ip4)
2202 self.assertEqual(tcp.sport, server_out_port)
2203 self.assertEqual(tcp.dport, host_in_port)
2204 self.check_tcp_checksum(p)
2206 self.logger.error(ppp("Unexpected or invalid packet:", p))
2209 def test_hairpinning2(self):
2210 """ NAT44 hairpinning - 1:1 NAT"""
2212 server1_nat_ip = "10.0.0.10"
2213 server2_nat_ip = "10.0.0.11"
2214 host = self.pg0.remote_hosts[0]
2215 server1 = self.pg0.remote_hosts[1]
2216 server2 = self.pg0.remote_hosts[2]
2217 server_tcp_port = 22
2218 server_udp_port = 20
2220 self.nat44_add_address(self.nat_addr)
2221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2225 # add static mapping for servers
2226 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2227 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=host.ip4, dst=server1_nat_ip) /
2233 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236 IP(src=host.ip4, dst=server1_nat_ip) /
2237 UDP(sport=self.udp_port_in, dport=server_udp_port))
2239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240 IP(src=host.ip4, dst=server1_nat_ip) /
2241 ICMP(id=self.icmp_id_in, type='echo-request'))
2243 self.pg0.add_stream(pkts)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg0.get_capture(len(pkts))
2247 for packet in capture:
2249 self.assertEqual(packet[IP].src, self.nat_addr)
2250 self.assertEqual(packet[IP].dst, server1.ip4)
2251 if packet.haslayer(TCP):
2252 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2253 self.assertEqual(packet[TCP].dport, server_tcp_port)
2254 self.tcp_port_out = packet[TCP].sport
2255 self.check_tcp_checksum(packet)
2256 elif packet.haslayer(UDP):
2257 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2258 self.assertEqual(packet[UDP].dport, server_udp_port)
2259 self.udp_port_out = packet[UDP].sport
2261 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2262 self.icmp_id_out = packet[ICMP].id
2264 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=self.nat_addr) /
2271 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=server1.ip4, dst=self.nat_addr) /
2275 UDP(sport=server_udp_port, dport=self.udp_port_out))
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=server1.ip4, dst=self.nat_addr) /
2279 ICMP(id=self.icmp_id_out, type='echo-reply'))
2281 self.pg0.add_stream(pkts)
2282 self.pg_enable_capture(self.pg_interfaces)
2284 capture = self.pg0.get_capture(len(pkts))
2285 for packet in capture:
2287 self.assertEqual(packet[IP].src, server1_nat_ip)
2288 self.assertEqual(packet[IP].dst, host.ip4)
2289 if packet.haslayer(TCP):
2290 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2291 self.assertEqual(packet[TCP].sport, server_tcp_port)
2292 self.check_tcp_checksum(packet)
2293 elif packet.haslayer(UDP):
2294 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2295 self.assertEqual(packet[UDP].sport, server_udp_port)
2297 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2299 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2302 # server2 to server1
2304 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2305 IP(src=server2.ip4, dst=server1_nat_ip) /
2306 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2308 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309 IP(src=server2.ip4, dst=server1_nat_ip) /
2310 UDP(sport=self.udp_port_in, dport=server_udp_port))
2312 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313 IP(src=server2.ip4, dst=server1_nat_ip) /
2314 ICMP(id=self.icmp_id_in, type='echo-request'))
2316 self.pg0.add_stream(pkts)
2317 self.pg_enable_capture(self.pg_interfaces)
2319 capture = self.pg0.get_capture(len(pkts))
2320 for packet in capture:
2322 self.assertEqual(packet[IP].src, server2_nat_ip)
2323 self.assertEqual(packet[IP].dst, server1.ip4)
2324 if packet.haslayer(TCP):
2325 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2326 self.assertEqual(packet[TCP].dport, server_tcp_port)
2327 self.tcp_port_out = packet[TCP].sport
2328 self.check_tcp_checksum(packet)
2329 elif packet.haslayer(UDP):
2330 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2331 self.assertEqual(packet[UDP].dport, server_udp_port)
2332 self.udp_port_out = packet[UDP].sport
2334 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2335 self.icmp_id_out = packet[ICMP].id
2337 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2340 # server1 to server2
2342 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2343 IP(src=server1.ip4, dst=server2_nat_ip) /
2344 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2347 IP(src=server1.ip4, dst=server2_nat_ip) /
2348 UDP(sport=server_udp_port, dport=self.udp_port_out))
2350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2351 IP(src=server1.ip4, dst=server2_nat_ip) /
2352 ICMP(id=self.icmp_id_out, type='echo-reply'))
2354 self.pg0.add_stream(pkts)
2355 self.pg_enable_capture(self.pg_interfaces)
2357 capture = self.pg0.get_capture(len(pkts))
2358 for packet in capture:
2360 self.assertEqual(packet[IP].src, server1_nat_ip)
2361 self.assertEqual(packet[IP].dst, server2.ip4)
2362 if packet.haslayer(TCP):
2363 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2364 self.assertEqual(packet[TCP].sport, server_tcp_port)
2365 self.check_tcp_checksum(packet)
2366 elif packet.haslayer(UDP):
2367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2368 self.assertEqual(packet[UDP].sport, server_udp_port)
2370 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2372 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2375 def test_max_translations_per_user(self):
2376 """ MAX translations per user - recycle the least recently used """
2378 self.nat44_add_address(self.nat_addr)
2379 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2380 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2383 # get maximum number of translations per user
2384 nat44_config = self.vapi.nat_show_config()
2386 # send more than maximum number of translations per user packets
2387 pkts_num = nat44_config.max_translations_per_user + 5
2389 for port in range(0, pkts_num):
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392 TCP(sport=1025 + port))
2394 self.pg0.add_stream(pkts)
2395 self.pg_enable_capture(self.pg_interfaces)
2398 # verify number of translated packet
2399 self.pg1.get_capture(pkts_num)
2401 def test_interface_addr(self):
2402 """ Acquire NAT44 addresses from interface """
2403 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2405 # no address in NAT pool
2406 adresses = self.vapi.nat44_address_dump()
2407 self.assertEqual(0, len(adresses))
2409 # configure interface address and check NAT address pool
2410 self.pg7.config_ip4()
2411 adresses = self.vapi.nat44_address_dump()
2412 self.assertEqual(1, len(adresses))
2413 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2415 # remove interface address and check NAT address pool
2416 self.pg7.unconfig_ip4()
2417 adresses = self.vapi.nat44_address_dump()
2418 self.assertEqual(0, len(adresses))
2420 def test_interface_addr_static_mapping(self):
2421 """ Static mapping with addresses from interface """
2424 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425 self.nat44_add_static_mapping(
2427 external_sw_if_index=self.pg7.sw_if_index,
2430 # static mappings with external interface
2431 static_mappings = self.vapi.nat44_static_mapping_dump()
2432 self.assertEqual(1, len(static_mappings))
2433 self.assertEqual(self.pg7.sw_if_index,
2434 static_mappings[0].external_sw_if_index)
2435 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2437 # configure interface address and check static mappings
2438 self.pg7.config_ip4()
2439 static_mappings = self.vapi.nat44_static_mapping_dump()
2440 self.assertEqual(1, len(static_mappings))
2441 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2442 self.pg7.local_ip4n)
2443 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2444 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2446 # remove interface address and check static mappings
2447 self.pg7.unconfig_ip4()
2448 static_mappings = self.vapi.nat44_static_mapping_dump()
2449 self.assertEqual(0, len(static_mappings))
2451 def test_interface_addr_identity_nat(self):
2452 """ Identity NAT with addresses from interface """
2455 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2456 self.vapi.nat44_add_del_identity_mapping(
2457 sw_if_index=self.pg7.sw_if_index,
2459 protocol=IP_PROTOS.tcp,
2462 # identity mappings with external interface
2463 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2464 self.assertEqual(1, len(identity_mappings))
2465 self.assertEqual(self.pg7.sw_if_index,
2466 identity_mappings[0].sw_if_index)
2468 # configure interface address and check identity mappings
2469 self.pg7.config_ip4()
2470 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2471 self.assertEqual(1, len(identity_mappings))
2472 self.assertEqual(identity_mappings[0].ip_address,
2473 self.pg7.local_ip4n)
2474 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2475 self.assertEqual(port, identity_mappings[0].port)
2476 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2478 # remove interface address and check identity mappings
2479 self.pg7.unconfig_ip4()
2480 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2481 self.assertEqual(0, len(identity_mappings))
2483 def test_ipfix_nat44_sess(self):
2484 """ IPFIX logging NAT44 session created/delted """
2485 self.ipfix_domain_id = 10
2486 self.ipfix_src_port = 20202
2487 colector_port = 30303
2488 bind_layers(UDP, IPFIX, dport=30303)
2489 self.nat44_add_address(self.nat_addr)
2490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2493 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2494 src_address=self.pg3.local_ip4n,
2496 template_interval=10,
2497 collector_port=colector_port)
2498 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2499 src_port=self.ipfix_src_port)
2501 pkts = self.create_stream_in(self.pg0, self.pg1)
2502 self.pg0.add_stream(pkts)
2503 self.pg_enable_capture(self.pg_interfaces)
2505 capture = self.pg1.get_capture(len(pkts))
2506 self.verify_capture_out(capture)
2507 self.nat44_add_address(self.nat_addr, is_add=0)
2508 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2509 capture = self.pg3.get_capture(9)
2510 ipfix = IPFIXDecoder()
2511 # first load template
2513 self.assertTrue(p.haslayer(IPFIX))
2514 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2515 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2516 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2517 self.assertEqual(p[UDP].dport, colector_port)
2518 self.assertEqual(p[IPFIX].observationDomainID,
2519 self.ipfix_domain_id)
2520 if p.haslayer(Template):
2521 ipfix.add_template(p.getlayer(Template))
2522 # verify events in data set
2524 if p.haslayer(Data):
2525 data = ipfix.decode_data_set(p.getlayer(Set))
2526 self.verify_ipfix_nat44_ses(data)
2528 def test_ipfix_addr_exhausted(self):
2529 """ IPFIX logging NAT addresses exhausted """
2530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2533 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2534 src_address=self.pg3.local_ip4n,
2536 template_interval=10)
2537 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2538 src_port=self.ipfix_src_port)
2540 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2541 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2543 self.pg0.add_stream(p)
2544 self.pg_enable_capture(self.pg_interfaces)
2546 capture = self.pg1.get_capture(0)
2547 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2548 capture = self.pg3.get_capture(9)
2549 ipfix = IPFIXDecoder()
2550 # first load template
2552 self.assertTrue(p.haslayer(IPFIX))
2553 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2554 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2555 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2556 self.assertEqual(p[UDP].dport, 4739)
2557 self.assertEqual(p[IPFIX].observationDomainID,
2558 self.ipfix_domain_id)
2559 if p.haslayer(Template):
2560 ipfix.add_template(p.getlayer(Template))
2561 # verify events in data set
2563 if p.haslayer(Data):
2564 data = ipfix.decode_data_set(p.getlayer(Set))
2565 self.verify_ipfix_addr_exhausted(data)
2567 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2568 def test_ipfix_max_sessions(self):
2569 """ IPFIX logging maximum session entries exceeded """
2570 self.nat44_add_address(self.nat_addr)
2571 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2572 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2575 nat44_config = self.vapi.nat_show_config()
2576 max_sessions = 10 * nat44_config.translation_buckets
2579 for i in range(0, max_sessions):
2580 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2581 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2582 IP(src=src, dst=self.pg1.remote_ip4) /
2585 self.pg0.add_stream(pkts)
2586 self.pg_enable_capture(self.pg_interfaces)
2589 self.pg1.get_capture(max_sessions)
2590 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2591 src_address=self.pg3.local_ip4n,
2593 template_interval=10)
2594 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2595 src_port=self.ipfix_src_port)
2597 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2598 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2600 self.pg0.add_stream(p)
2601 self.pg_enable_capture(self.pg_interfaces)
2603 self.pg1.get_capture(0)
2604 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2605 capture = self.pg3.get_capture(9)
2606 ipfix = IPFIXDecoder()
2607 # first load template
2609 self.assertTrue(p.haslayer(IPFIX))
2610 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2611 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2612 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2613 self.assertEqual(p[UDP].dport, 4739)
2614 self.assertEqual(p[IPFIX].observationDomainID,
2615 self.ipfix_domain_id)
2616 if p.haslayer(Template):
2617 ipfix.add_template(p.getlayer(Template))
2618 # verify events in data set
2620 if p.haslayer(Data):
2621 data = ipfix.decode_data_set(p.getlayer(Set))
2622 self.verify_ipfix_max_sessions(data, max_sessions)
2624 def test_pool_addr_fib(self):
2625 """ NAT44 add pool addresses to FIB """
2626 static_addr = '10.0.0.10'
2627 self.nat44_add_address(self.nat_addr)
2628 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2629 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2631 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2634 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2635 ARP(op=ARP.who_has, pdst=self.nat_addr,
2636 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2637 self.pg1.add_stream(p)
2638 self.pg_enable_capture(self.pg_interfaces)
2640 capture = self.pg1.get_capture(1)
2641 self.assertTrue(capture[0].haslayer(ARP))
2642 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2645 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2646 ARP(op=ARP.who_has, pdst=static_addr,
2647 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2648 self.pg1.add_stream(p)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 capture = self.pg1.get_capture(1)
2652 self.assertTrue(capture[0].haslayer(ARP))
2653 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2655 # send ARP to non-NAT44 interface
2656 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2657 ARP(op=ARP.who_has, pdst=self.nat_addr,
2658 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2659 self.pg2.add_stream(p)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 capture = self.pg1.get_capture(0)
2664 # remove addresses and verify
2665 self.nat44_add_address(self.nat_addr, is_add=0)
2666 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2669 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2670 ARP(op=ARP.who_has, pdst=self.nat_addr,
2671 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2672 self.pg1.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 capture = self.pg1.get_capture(0)
2677 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2678 ARP(op=ARP.who_has, pdst=static_addr,
2679 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2680 self.pg1.add_stream(p)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture = self.pg1.get_capture(0)
2685 def test_vrf_mode(self):
2686 """ NAT44 tenant VRF aware address pool mode """
2690 nat_ip1 = "10.0.0.10"
2691 nat_ip2 = "10.0.0.11"
2693 self.pg0.unconfig_ip4()
2694 self.pg1.unconfig_ip4()
2695 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2696 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2697 self.pg0.set_table_ip4(vrf_id1)
2698 self.pg1.set_table_ip4(vrf_id2)
2699 self.pg0.config_ip4()
2700 self.pg1.config_ip4()
2702 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2703 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2704 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2705 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2706 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2710 pkts = self.create_stream_in(self.pg0, self.pg2)
2711 self.pg0.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg2.get_capture(len(pkts))
2715 self.verify_capture_out(capture, nat_ip1)
2718 pkts = self.create_stream_in(self.pg1, self.pg2)
2719 self.pg1.add_stream(pkts)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg2.get_capture(len(pkts))
2723 self.verify_capture_out(capture, nat_ip2)
2725 self.pg0.unconfig_ip4()
2726 self.pg1.unconfig_ip4()
2727 self.pg0.set_table_ip4(0)
2728 self.pg1.set_table_ip4(0)
2729 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2730 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2732 def test_vrf_feature_independent(self):
2733 """ NAT44 tenant VRF independent address pool mode """
2735 nat_ip1 = "10.0.0.10"
2736 nat_ip2 = "10.0.0.11"
2738 self.nat44_add_address(nat_ip1)
2739 self.nat44_add_address(nat_ip2, vrf_id=99)
2740 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2741 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2742 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2746 pkts = self.create_stream_in(self.pg0, self.pg2)
2747 self.pg0.add_stream(pkts)
2748 self.pg_enable_capture(self.pg_interfaces)
2750 capture = self.pg2.get_capture(len(pkts))
2751 self.verify_capture_out(capture, nat_ip1)
2754 pkts = self.create_stream_in(self.pg1, self.pg2)
2755 self.pg1.add_stream(pkts)
2756 self.pg_enable_capture(self.pg_interfaces)
2758 capture = self.pg2.get_capture(len(pkts))
2759 self.verify_capture_out(capture, nat_ip1)
2761 def test_dynamic_ipless_interfaces(self):
2762 """ NAT44 interfaces without configured IP address """
2764 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2765 mactobinary(self.pg7.remote_mac),
2766 self.pg7.remote_ip4n,
2768 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2769 mactobinary(self.pg8.remote_mac),
2770 self.pg8.remote_ip4n,
2773 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2774 dst_address_length=32,
2775 next_hop_address=self.pg7.remote_ip4n,
2776 next_hop_sw_if_index=self.pg7.sw_if_index)
2777 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2778 dst_address_length=32,
2779 next_hop_address=self.pg8.remote_ip4n,
2780 next_hop_sw_if_index=self.pg8.sw_if_index)
2782 self.nat44_add_address(self.nat_addr)
2783 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2784 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2788 pkts = self.create_stream_in(self.pg7, self.pg8)
2789 self.pg7.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg8.get_capture(len(pkts))
2793 self.verify_capture_out(capture)
2796 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2797 self.pg8.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg7.get_capture(len(pkts))
2801 self.verify_capture_in(capture, self.pg7)
2803 def test_static_ipless_interfaces(self):
2804 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2806 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2807 mactobinary(self.pg7.remote_mac),
2808 self.pg7.remote_ip4n,
2810 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2811 mactobinary(self.pg8.remote_mac),
2812 self.pg8.remote_ip4n,
2815 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2816 dst_address_length=32,
2817 next_hop_address=self.pg7.remote_ip4n,
2818 next_hop_sw_if_index=self.pg7.sw_if_index)
2819 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2820 dst_address_length=32,
2821 next_hop_address=self.pg8.remote_ip4n,
2822 next_hop_sw_if_index=self.pg8.sw_if_index)
2824 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2825 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2826 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2830 pkts = self.create_stream_out(self.pg8)
2831 self.pg8.add_stream(pkts)
2832 self.pg_enable_capture(self.pg_interfaces)
2834 capture = self.pg7.get_capture(len(pkts))
2835 self.verify_capture_in(capture, self.pg7)
2838 pkts = self.create_stream_in(self.pg7, self.pg8)
2839 self.pg7.add_stream(pkts)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 capture = self.pg8.get_capture(len(pkts))
2843 self.verify_capture_out(capture, self.nat_addr, True)
2845 def test_static_with_port_ipless_interfaces(self):
2846 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2848 self.tcp_port_out = 30606
2849 self.udp_port_out = 30607
2850 self.icmp_id_out = 30608
2852 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2853 mactobinary(self.pg7.remote_mac),
2854 self.pg7.remote_ip4n,
2856 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2857 mactobinary(self.pg8.remote_mac),
2858 self.pg8.remote_ip4n,
2861 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2862 dst_address_length=32,
2863 next_hop_address=self.pg7.remote_ip4n,
2864 next_hop_sw_if_index=self.pg7.sw_if_index)
2865 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2866 dst_address_length=32,
2867 next_hop_address=self.pg8.remote_ip4n,
2868 next_hop_sw_if_index=self.pg8.sw_if_index)
2870 self.nat44_add_address(self.nat_addr)
2871 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2872 self.tcp_port_in, self.tcp_port_out,
2873 proto=IP_PROTOS.tcp)
2874 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2875 self.udp_port_in, self.udp_port_out,
2876 proto=IP_PROTOS.udp)
2877 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2878 self.icmp_id_in, self.icmp_id_out,
2879 proto=IP_PROTOS.icmp)
2880 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2881 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2885 pkts = self.create_stream_out(self.pg8)
2886 self.pg8.add_stream(pkts)
2887 self.pg_enable_capture(self.pg_interfaces)
2889 capture = self.pg7.get_capture(len(pkts))
2890 self.verify_capture_in(capture, self.pg7)
2893 pkts = self.create_stream_in(self.pg7, self.pg8)
2894 self.pg7.add_stream(pkts)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg8.get_capture(len(pkts))
2898 self.verify_capture_out(capture)
2900 def test_static_unknown_proto(self):
2901 """ 1:1 NAT translate packet with unknown protocol """
2902 nat_ip = "10.0.0.10"
2903 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2904 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2905 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2912 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2913 TCP(sport=1234, dport=1234))
2914 self.pg0.add_stream(p)
2915 self.pg_enable_capture(self.pg_interfaces)
2917 p = self.pg1.get_capture(1)
2920 self.assertEqual(packet[IP].src, nat_ip)
2921 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2922 self.assertTrue(packet.haslayer(GRE))
2923 self.check_ip_checksum(packet)
2925 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2929 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2930 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2932 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2933 TCP(sport=1234, dport=1234))
2934 self.pg1.add_stream(p)
2935 self.pg_enable_capture(self.pg_interfaces)
2937 p = self.pg0.get_capture(1)
2940 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2941 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2942 self.assertTrue(packet.haslayer(GRE))
2943 self.check_ip_checksum(packet)
2945 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2948 def test_hairpinning_static_unknown_proto(self):
2949 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2951 host = self.pg0.remote_hosts[0]
2952 server = self.pg0.remote_hosts[1]
2954 host_nat_ip = "10.0.0.10"
2955 server_nat_ip = "10.0.0.11"
2957 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2958 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2964 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2965 IP(src=host.ip4, dst=server_nat_ip) /
2967 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2968 TCP(sport=1234, dport=1234))
2969 self.pg0.add_stream(p)
2970 self.pg_enable_capture(self.pg_interfaces)
2972 p = self.pg0.get_capture(1)
2975 self.assertEqual(packet[IP].src, host_nat_ip)
2976 self.assertEqual(packet[IP].dst, server.ip4)
2977 self.assertTrue(packet.haslayer(GRE))
2978 self.check_ip_checksum(packet)
2980 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2984 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2985 IP(src=server.ip4, dst=host_nat_ip) /
2987 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2988 TCP(sport=1234, dport=1234))
2989 self.pg0.add_stream(p)
2990 self.pg_enable_capture(self.pg_interfaces)
2992 p = self.pg0.get_capture(1)
2995 self.assertEqual(packet[IP].src, server_nat_ip)
2996 self.assertEqual(packet[IP].dst, host.ip4)
2997 self.assertTrue(packet.haslayer(GRE))
2998 self.check_ip_checksum(packet)
3000 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3003 def test_unknown_proto(self):
3004 """ NAT44 translate packet with unknown protocol """
3005 self.nat44_add_address(self.nat_addr)
3006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3007 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3011 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3012 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3013 TCP(sport=self.tcp_port_in, dport=20))
3014 self.pg0.add_stream(p)
3015 self.pg_enable_capture(self.pg_interfaces)
3017 p = self.pg1.get_capture(1)
3019 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3022 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg1.get_capture(1)
3030 self.assertEqual(packet[IP].src, self.nat_addr)
3031 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3039 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3040 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3042 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3043 TCP(sport=1234, dport=1234))
3044 self.pg1.add_stream(p)
3045 self.pg_enable_capture(self.pg_interfaces)
3047 p = self.pg0.get_capture(1)
3050 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3051 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3052 self.assertTrue(packet.haslayer(GRE))
3053 self.check_ip_checksum(packet)
3055 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3058 def test_hairpinning_unknown_proto(self):
3059 """ NAT44 translate packet with unknown protocol - hairpinning """
3060 host = self.pg0.remote_hosts[0]
3061 server = self.pg0.remote_hosts[1]
3064 server_in_port = 5678
3065 server_out_port = 8765
3066 server_nat_ip = "10.0.0.11"
3068 self.nat44_add_address(self.nat_addr)
3069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3073 # add static mapping for server
3074 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3077 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3078 IP(src=host.ip4, dst=server_nat_ip) /
3079 TCP(sport=host_in_port, dport=server_out_port))
3080 self.pg0.add_stream(p)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg0.get_capture(1)
3085 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3086 IP(src=host.ip4, dst=server_nat_ip) /
3088 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3089 TCP(sport=1234, dport=1234))
3090 self.pg0.add_stream(p)
3091 self.pg_enable_capture(self.pg_interfaces)
3093 p = self.pg0.get_capture(1)
3096 self.assertEqual(packet[IP].src, self.nat_addr)
3097 self.assertEqual(packet[IP].dst, server.ip4)
3098 self.assertTrue(packet.haslayer(GRE))
3099 self.check_ip_checksum(packet)
3101 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3105 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3106 IP(src=server.ip4, dst=self.nat_addr) /
3108 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3109 TCP(sport=1234, dport=1234))
3110 self.pg0.add_stream(p)
3111 self.pg_enable_capture(self.pg_interfaces)
3113 p = self.pg0.get_capture(1)
3116 self.assertEqual(packet[IP].src, server_nat_ip)
3117 self.assertEqual(packet[IP].dst, host.ip4)
3118 self.assertTrue(packet.haslayer(GRE))
3119 self.check_ip_checksum(packet)
3121 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3124 def test_output_feature(self):
3125 """ NAT44 interface output feature (in2out postrouting) """
3126 self.nat44_add_address(self.nat_addr)
3127 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3128 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3129 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3133 pkts = self.create_stream_in(self.pg0, self.pg3)
3134 self.pg0.add_stream(pkts)
3135 self.pg_enable_capture(self.pg_interfaces)
3137 capture = self.pg3.get_capture(len(pkts))
3138 self.verify_capture_out(capture)
3141 pkts = self.create_stream_out(self.pg3)
3142 self.pg3.add_stream(pkts)
3143 self.pg_enable_capture(self.pg_interfaces)
3145 capture = self.pg0.get_capture(len(pkts))
3146 self.verify_capture_in(capture, self.pg0)
3148 # from non-NAT interface to NAT inside interface
3149 pkts = self.create_stream_in(self.pg2, self.pg0)
3150 self.pg2.add_stream(pkts)
3151 self.pg_enable_capture(self.pg_interfaces)
3153 capture = self.pg0.get_capture(len(pkts))
3154 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3156 def test_output_feature_vrf_aware(self):
3157 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3158 nat_ip_vrf10 = "10.0.0.10"
3159 nat_ip_vrf20 = "10.0.0.20"
3161 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3162 dst_address_length=32,
3163 next_hop_address=self.pg3.remote_ip4n,
3164 next_hop_sw_if_index=self.pg3.sw_if_index,
3166 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3167 dst_address_length=32,
3168 next_hop_address=self.pg3.remote_ip4n,
3169 next_hop_sw_if_index=self.pg3.sw_if_index,
3172 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3173 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3174 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3175 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3176 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3180 pkts = self.create_stream_in(self.pg4, self.pg3)
3181 self.pg4.add_stream(pkts)
3182 self.pg_enable_capture(self.pg_interfaces)
3184 capture = self.pg3.get_capture(len(pkts))
3185 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3188 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3189 self.pg3.add_stream(pkts)
3190 self.pg_enable_capture(self.pg_interfaces)
3192 capture = self.pg4.get_capture(len(pkts))
3193 self.verify_capture_in(capture, self.pg4)
3196 pkts = self.create_stream_in(self.pg6, self.pg3)
3197 self.pg6.add_stream(pkts)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg3.get_capture(len(pkts))
3201 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3204 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3205 self.pg3.add_stream(pkts)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg6.get_capture(len(pkts))
3209 self.verify_capture_in(capture, self.pg6)
3211 def test_output_feature_hairpinning(self):
3212 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3213 host = self.pg0.remote_hosts[0]
3214 server = self.pg0.remote_hosts[1]
3217 server_in_port = 5678
3218 server_out_port = 8765
3220 self.nat44_add_address(self.nat_addr)
3221 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3222 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3225 # add static mapping for server
3226 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3227 server_in_port, server_out_port,
3228 proto=IP_PROTOS.tcp)
3230 # send packet from host to server
3231 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3232 IP(src=host.ip4, dst=self.nat_addr) /
3233 TCP(sport=host_in_port, dport=server_out_port))
3234 self.pg0.add_stream(p)
3235 self.pg_enable_capture(self.pg_interfaces)
3237 capture = self.pg0.get_capture(1)
3242 self.assertEqual(ip.src, self.nat_addr)
3243 self.assertEqual(ip.dst, server.ip4)
3244 self.assertNotEqual(tcp.sport, host_in_port)
3245 self.assertEqual(tcp.dport, server_in_port)
3246 self.check_tcp_checksum(p)
3247 host_out_port = tcp.sport
3249 self.logger.error(ppp("Unexpected or invalid packet:", p))
3252 # send reply from server to host
3253 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3254 IP(src=server.ip4, dst=self.nat_addr) /
3255 TCP(sport=server_in_port, dport=host_out_port))
3256 self.pg0.add_stream(p)
3257 self.pg_enable_capture(self.pg_interfaces)
3259 capture = self.pg0.get_capture(1)
3264 self.assertEqual(ip.src, self.nat_addr)
3265 self.assertEqual(ip.dst, host.ip4)
3266 self.assertEqual(tcp.sport, server_out_port)
3267 self.assertEqual(tcp.dport, host_in_port)
3268 self.check_tcp_checksum(p)
3270 self.logger.error(ppp("Unexpected or invalid packet:", p))
3273 def test_one_armed_nat44(self):
3274 """ One armed NAT44 """
3275 remote_host = self.pg9.remote_hosts[0]
3276 local_host = self.pg9.remote_hosts[1]
3279 self.nat44_add_address(self.nat_addr)
3280 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3281 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3285 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3286 IP(src=local_host.ip4, dst=remote_host.ip4) /
3287 TCP(sport=12345, dport=80))
3288 self.pg9.add_stream(p)
3289 self.pg_enable_capture(self.pg_interfaces)
3291 capture = self.pg9.get_capture(1)
3296 self.assertEqual(ip.src, self.nat_addr)
3297 self.assertEqual(ip.dst, remote_host.ip4)
3298 self.assertNotEqual(tcp.sport, 12345)
3299 external_port = tcp.sport
3300 self.assertEqual(tcp.dport, 80)
3301 self.check_tcp_checksum(p)
3302 self.check_ip_checksum(p)
3304 self.logger.error(ppp("Unexpected or invalid packet:", p))
3308 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3309 IP(src=remote_host.ip4, dst=self.nat_addr) /
3310 TCP(sport=80, dport=external_port))
3311 self.pg9.add_stream(p)
3312 self.pg_enable_capture(self.pg_interfaces)
3314 capture = self.pg9.get_capture(1)
3319 self.assertEqual(ip.src, remote_host.ip4)
3320 self.assertEqual(ip.dst, local_host.ip4)
3321 self.assertEqual(tcp.sport, 80)
3322 self.assertEqual(tcp.dport, 12345)
3323 self.check_tcp_checksum(p)
3324 self.check_ip_checksum(p)
3326 self.logger.error(ppp("Unexpected or invalid packet:", p))
3329 def test_one_armed_nat44_static(self):
3330 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3331 remote_host = self.pg9.remote_hosts[0]
3332 local_host = self.pg9.remote_hosts[1]
3337 self.vapi.nat44_forwarding_enable_disable(1)
3338 self.nat44_add_address(self.nat_addr, twice_nat=1)
3339 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3340 local_port, external_port,
3341 proto=IP_PROTOS.tcp, out2in_only=1,
3343 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3344 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3347 # from client to service
3348 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3349 IP(src=remote_host.ip4, dst=self.nat_addr) /
3350 TCP(sport=12345, dport=external_port))
3351 self.pg9.add_stream(p)
3352 self.pg_enable_capture(self.pg_interfaces)
3354 capture = self.pg9.get_capture(1)
3360 self.assertEqual(ip.dst, local_host.ip4)
3361 self.assertEqual(ip.src, self.nat_addr)
3362 self.assertEqual(tcp.dport, local_port)
3363 self.assertNotEqual(tcp.sport, 12345)
3364 eh_port_in = tcp.sport
3365 self.check_tcp_checksum(p)
3366 self.check_ip_checksum(p)
3368 self.logger.error(ppp("Unexpected or invalid packet:", p))
3371 # from service back to client
3372 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3373 IP(src=local_host.ip4, dst=self.nat_addr) /
3374 TCP(sport=local_port, dport=eh_port_in))
3375 self.pg9.add_stream(p)
3376 self.pg_enable_capture(self.pg_interfaces)
3378 capture = self.pg9.get_capture(1)
3383 self.assertEqual(ip.src, self.nat_addr)
3384 self.assertEqual(ip.dst, remote_host.ip4)
3385 self.assertEqual(tcp.sport, external_port)
3386 self.assertEqual(tcp.dport, 12345)
3387 self.check_tcp_checksum(p)
3388 self.check_ip_checksum(p)
3390 self.logger.error(ppp("Unexpected or invalid packet:", p))
3393 def test_del_session(self):
3394 """ Delete NAT44 session """
3395 self.nat44_add_address(self.nat_addr)
3396 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3397 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3400 pkts = self.create_stream_in(self.pg0, self.pg1)
3401 self.pg0.add_stream(pkts)
3402 self.pg_enable_capture(self.pg_interfaces)
3404 capture = self.pg1.get_capture(len(pkts))
3406 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3407 nsessions = len(sessions)
3409 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3410 sessions[0].inside_port,
3411 sessions[0].protocol)
3412 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3413 sessions[1].outside_port,
3414 sessions[1].protocol,
3417 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3418 self.assertEqual(nsessions - len(sessions), 2)
3420 def test_set_get_reass(self):
3421 """ NAT44 set/get virtual fragmentation reassembly """
3422 reas_cfg1 = self.vapi.nat_get_reass()
3424 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3425 max_reass=reas_cfg1.ip4_max_reass * 2,
3426 max_frag=reas_cfg1.ip4_max_frag * 2)
3428 reas_cfg2 = self.vapi.nat_get_reass()
3430 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3431 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3432 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3434 self.vapi.nat_set_reass(drop_frag=1)
3435 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3437 def test_frag_in_order(self):
3438 """ NAT44 translate fragments arriving in order """
3439 self.nat44_add_address(self.nat_addr)
3440 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3441 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3444 data = "A" * 4 + "B" * 16 + "C" * 3
3445 self.tcp_port_in = random.randint(1025, 65535)
3447 reass = self.vapi.nat_reass_dump()
3448 reass_n_start = len(reass)
3451 pkts = self.create_stream_frag(self.pg0,
3452 self.pg1.remote_ip4,
3456 self.pg0.add_stream(pkts)
3457 self.pg_enable_capture(self.pg_interfaces)
3459 frags = self.pg1.get_capture(len(pkts))
3460 p = self.reass_frags_and_verify(frags,
3462 self.pg1.remote_ip4)
3463 self.assertEqual(p[TCP].dport, 20)
3464 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3465 self.tcp_port_out = p[TCP].sport
3466 self.assertEqual(data, p[Raw].load)
3469 pkts = self.create_stream_frag(self.pg1,
3474 self.pg1.add_stream(pkts)
3475 self.pg_enable_capture(self.pg_interfaces)
3477 frags = self.pg0.get_capture(len(pkts))
3478 p = self.reass_frags_and_verify(frags,
3479 self.pg1.remote_ip4,
3480 self.pg0.remote_ip4)
3481 self.assertEqual(p[TCP].sport, 20)
3482 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3483 self.assertEqual(data, p[Raw].load)
3485 reass = self.vapi.nat_reass_dump()
3486 reass_n_end = len(reass)
3488 self.assertEqual(reass_n_end - reass_n_start, 2)
3490 def test_reass_hairpinning(self):
3491 """ NAT44 fragments hairpinning """
3492 host = self.pg0.remote_hosts[0]
3493 server = self.pg0.remote_hosts[1]
3494 host_in_port = random.randint(1025, 65535)
3496 server_in_port = random.randint(1025, 65535)
3497 server_out_port = random.randint(1025, 65535)
3498 data = "A" * 4 + "B" * 16 + "C" * 3
3500 self.nat44_add_address(self.nat_addr)
3501 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3502 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3504 # add static mapping for server
3505 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3506 server_in_port, server_out_port,
3507 proto=IP_PROTOS.tcp)
3509 # send packet from host to server
3510 pkts = self.create_stream_frag(self.pg0,
3515 self.pg0.add_stream(pkts)
3516 self.pg_enable_capture(self.pg_interfaces)
3518 frags = self.pg0.get_capture(len(pkts))
3519 p = self.reass_frags_and_verify(frags,
3522 self.assertNotEqual(p[TCP].sport, host_in_port)
3523 self.assertEqual(p[TCP].dport, server_in_port)
3524 self.assertEqual(data, p[Raw].load)
3526 def test_frag_out_of_order(self):
3527 """ NAT44 translate fragments arriving out of order """
3528 self.nat44_add_address(self.nat_addr)
3529 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3530 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3533 data = "A" * 4 + "B" * 16 + "C" * 3
3534 random.randint(1025, 65535)
3537 pkts = self.create_stream_frag(self.pg0,
3538 self.pg1.remote_ip4,
3543 self.pg0.add_stream(pkts)
3544 self.pg_enable_capture(self.pg_interfaces)
3546 frags = self.pg1.get_capture(len(pkts))
3547 p = self.reass_frags_and_verify(frags,
3549 self.pg1.remote_ip4)
3550 self.assertEqual(p[TCP].dport, 20)
3551 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3552 self.tcp_port_out = p[TCP].sport
3553 self.assertEqual(data, p[Raw].load)
3556 pkts = self.create_stream_frag(self.pg1,
3562 self.pg1.add_stream(pkts)
3563 self.pg_enable_capture(self.pg_interfaces)
3565 frags = self.pg0.get_capture(len(pkts))
3566 p = self.reass_frags_and_verify(frags,
3567 self.pg1.remote_ip4,
3568 self.pg0.remote_ip4)
3569 self.assertEqual(p[TCP].sport, 20)
3570 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3571 self.assertEqual(data, p[Raw].load)
3573 def test_port_restricted(self):
3574 """ Port restricted NAT44 (MAP-E CE) """
3575 self.nat44_add_address(self.nat_addr)
3576 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3577 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3579 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3580 "psid-offset 6 psid-len 6")
3582 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3583 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3584 TCP(sport=4567, dport=22))
3585 self.pg0.add_stream(p)
3586 self.pg_enable_capture(self.pg_interfaces)
3588 capture = self.pg1.get_capture(1)
3593 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3594 self.assertEqual(ip.src, self.nat_addr)
3595 self.assertEqual(tcp.dport, 22)
3596 self.assertNotEqual(tcp.sport, 4567)
3597 self.assertEqual((tcp.sport >> 6) & 63, 10)
3598 self.check_tcp_checksum(p)
3599 self.check_ip_checksum(p)
3601 self.logger.error(ppp("Unexpected or invalid packet:", p))
3604 def test_twice_nat(self):
3606 twice_nat_addr = '10.0.1.3'
3611 self.nat44_add_address(self.nat_addr)
3612 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3613 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3614 port_in, port_out, proto=IP_PROTOS.tcp,
3616 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3617 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3620 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3621 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3622 TCP(sport=eh_port_out, dport=port_out))
3623 self.pg1.add_stream(p)
3624 self.pg_enable_capture(self.pg_interfaces)
3626 capture = self.pg0.get_capture(1)
3631 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3632 self.assertEqual(ip.src, twice_nat_addr)
3633 self.assertEqual(tcp.dport, port_in)
3634 self.assertNotEqual(tcp.sport, eh_port_out)
3635 eh_port_in = tcp.sport
3636 self.check_tcp_checksum(p)
3637 self.check_ip_checksum(p)
3639 self.logger.error(ppp("Unexpected or invalid packet:", p))
3642 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3643 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3644 TCP(sport=port_in, dport=eh_port_in))
3645 self.pg0.add_stream(p)
3646 self.pg_enable_capture(self.pg_interfaces)
3648 capture = self.pg1.get_capture(1)
3653 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3654 self.assertEqual(ip.src, self.nat_addr)
3655 self.assertEqual(tcp.dport, eh_port_out)
3656 self.assertEqual(tcp.sport, port_out)
3657 self.check_tcp_checksum(p)
3658 self.check_ip_checksum(p)
3660 self.logger.error(ppp("Unexpected or invalid packet:", p))
3663 def test_twice_nat_lb(self):
3664 """ Twice NAT44 local service load balancing """
3665 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3666 twice_nat_addr = '10.0.1.3'
3671 server1 = self.pg0.remote_hosts[0]
3672 server2 = self.pg0.remote_hosts[1]
3674 locals = [{'addr': server1.ip4n,
3677 {'addr': server2.ip4n,
3681 self.nat44_add_address(self.nat_addr)
3682 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3684 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3688 local_num=len(locals),
3690 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3691 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3694 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3695 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3696 TCP(sport=eh_port_out, dport=external_port))
3697 self.pg1.add_stream(p)
3698 self.pg_enable_capture(self.pg_interfaces)
3700 capture = self.pg0.get_capture(1)
3706 self.assertEqual(ip.src, twice_nat_addr)
3707 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3708 if ip.dst == server1.ip4:
3712 self.assertNotEqual(tcp.sport, eh_port_out)
3713 eh_port_in = tcp.sport
3714 self.assertEqual(tcp.dport, local_port)
3715 self.check_tcp_checksum(p)
3716 self.check_ip_checksum(p)
3718 self.logger.error(ppp("Unexpected or invalid packet:", p))
3721 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3722 IP(src=server.ip4, dst=twice_nat_addr) /
3723 TCP(sport=local_port, dport=eh_port_in))
3724 self.pg0.add_stream(p)
3725 self.pg_enable_capture(self.pg_interfaces)
3727 capture = self.pg1.get_capture(1)
3732 self.assertEqual(ip.src, self.nat_addr)
3733 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3734 self.assertEqual(tcp.sport, external_port)
3735 self.assertEqual(tcp.dport, eh_port_out)
3736 self.check_tcp_checksum(p)
3737 self.check_ip_checksum(p)
3739 self.logger.error(ppp("Unexpected or invalid packet:", p))
3742 def test_twice_nat_interface_addr(self):
3743 """ Acquire twice NAT44 addresses from interface """
3744 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3746 # no address in NAT pool
3747 adresses = self.vapi.nat44_address_dump()
3748 self.assertEqual(0, len(adresses))
3750 # configure interface address and check NAT address pool
3751 self.pg7.config_ip4()
3752 adresses = self.vapi.nat44_address_dump()
3753 self.assertEqual(1, len(adresses))
3754 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3755 self.assertEqual(adresses[0].twice_nat, 1)
3757 # remove interface address and check NAT address pool
3758 self.pg7.unconfig_ip4()
3759 adresses = self.vapi.nat44_address_dump()
3760 self.assertEqual(0, len(adresses))
3762 def test_ipfix_max_frags(self):
3763 """ IPFIX logging maximum fragments pending reassembly exceeded """
3764 self.nat44_add_address(self.nat_addr)
3765 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3766 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3768 self.vapi.nat_set_reass(max_frag=0)
3769 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3770 src_address=self.pg3.local_ip4n,
3772 template_interval=10)
3773 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3774 src_port=self.ipfix_src_port)
3776 data = "A" * 4 + "B" * 16 + "C" * 3
3777 self.tcp_port_in = random.randint(1025, 65535)
3778 pkts = self.create_stream_frag(self.pg0,
3779 self.pg1.remote_ip4,
3783 self.pg0.add_stream(pkts[-1])
3784 self.pg_enable_capture(self.pg_interfaces)
3786 frags = self.pg1.get_capture(0)
3787 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3788 capture = self.pg3.get_capture(9)
3789 ipfix = IPFIXDecoder()
3790 # first load template
3792 self.assertTrue(p.haslayer(IPFIX))
3793 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3794 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3795 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3796 self.assertEqual(p[UDP].dport, 4739)
3797 self.assertEqual(p[IPFIX].observationDomainID,
3798 self.ipfix_domain_id)
3799 if p.haslayer(Template):
3800 ipfix.add_template(p.getlayer(Template))
3801 # verify events in data set
3803 if p.haslayer(Data):
3804 data = ipfix.decode_data_set(p.getlayer(Set))
3805 self.verify_ipfix_max_fragments_ip4(data, 0,
3806 self.pg0.remote_ip4n)
3809 super(TestNAT44, self).tearDown()
3810 if not self.vpp_dead:
3811 self.logger.info(self.vapi.cli("show nat44 addresses"))
3812 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3813 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3814 self.logger.info(self.vapi.cli("show nat44 interface address"))
3815 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3816 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3817 self.vapi.cli("nat addr-port-assignment-alg default")
3821 class TestNAT44Out2InDPO(MethodHolder):
3822 """ NAT44 Test Cases using out2in DPO """
3825 def setUpConstants(cls):
3826 super(TestNAT44Out2InDPO, cls).setUpConstants()
3827 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3830 def setUpClass(cls):
3831 super(TestNAT44Out2InDPO, cls).setUpClass()
3834 cls.tcp_port_in = 6303
3835 cls.tcp_port_out = 6303
3836 cls.udp_port_in = 6304
3837 cls.udp_port_out = 6304
3838 cls.icmp_id_in = 6305
3839 cls.icmp_id_out = 6305
3840 cls.nat_addr = '10.0.0.3'
3841 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3842 cls.dst_ip4 = '192.168.70.1'
3844 cls.create_pg_interfaces(range(2))
3847 cls.pg0.config_ip4()
3848 cls.pg0.resolve_arp()
3851 cls.pg1.config_ip6()
3852 cls.pg1.resolve_ndp()
3854 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3855 dst_address_length=0,
3856 next_hop_address=cls.pg1.remote_ip6n,
3857 next_hop_sw_if_index=cls.pg1.sw_if_index)
3860 super(TestNAT44Out2InDPO, cls).tearDownClass()
3863 def configure_xlat(self):
3864 self.dst_ip6_pfx = '1:2:3::'
3865 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3867 self.dst_ip6_pfx_len = 96
3868 self.src_ip6_pfx = '4:5:6::'
3869 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3871 self.src_ip6_pfx_len = 96
3872 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3873 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3874 '\x00\x00\x00\x00', 0, is_translation=1,
3877 def test_464xlat_ce(self):
3878 """ Test 464XLAT CE with NAT44 """
3880 self.configure_xlat()
3882 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3883 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3885 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3886 self.dst_ip6_pfx_len)
3887 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3888 self.src_ip6_pfx_len)
3891 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3892 self.pg0.add_stream(pkts)
3893 self.pg_enable_capture(self.pg_interfaces)
3895 capture = self.pg1.get_capture(len(pkts))
3896 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3899 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3901 self.pg1.add_stream(pkts)
3902 self.pg_enable_capture(self.pg_interfaces)
3904 capture = self.pg0.get_capture(len(pkts))
3905 self.verify_capture_in(capture, self.pg0)
3907 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3909 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3910 self.nat_addr_n, is_add=0)
3912 def test_464xlat_ce_no_nat(self):
3913 """ Test 464XLAT CE without NAT44 """
3915 self.configure_xlat()
3917 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3918 self.dst_ip6_pfx_len)
3919 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3920 self.src_ip6_pfx_len)
3922 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3923 self.pg0.add_stream(pkts)
3924 self.pg_enable_capture(self.pg_interfaces)
3926 capture = self.pg1.get_capture(len(pkts))
3927 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3928 nat_ip=out_dst_ip6, same_port=True)
3930 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3931 self.pg1.add_stream(pkts)
3932 self.pg_enable_capture(self.pg_interfaces)
3934 capture = self.pg0.get_capture(len(pkts))
3935 self.verify_capture_in(capture, self.pg0)
3938 class TestDeterministicNAT(MethodHolder):
3939 """ Deterministic NAT Test Cases """
3942 def setUpConstants(cls):
3943 super(TestDeterministicNAT, cls).setUpConstants()
3944 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3947 def setUpClass(cls):
3948 super(TestDeterministicNAT, cls).setUpClass()
3951 cls.tcp_port_in = 6303
3952 cls.tcp_external_port = 6303
3953 cls.udp_port_in = 6304
3954 cls.udp_external_port = 6304
3955 cls.icmp_id_in = 6305
3956 cls.nat_addr = '10.0.0.3'
3958 cls.create_pg_interfaces(range(3))
3959 cls.interfaces = list(cls.pg_interfaces)
3961 for i in cls.interfaces:
3966 cls.pg0.generate_remote_hosts(2)
3967 cls.pg0.configure_ipv4_neighbors()
3970 super(TestDeterministicNAT, cls).tearDownClass()
3973 def create_stream_in(self, in_if, out_if, ttl=64):
3975 Create packet stream for inside network
3977 :param in_if: Inside interface
3978 :param out_if: Outside interface
3979 :param ttl: TTL of generated packets
3983 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3984 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3985 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3989 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3990 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3991 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3995 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3996 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3997 ICMP(id=self.icmp_id_in, type='echo-request'))
4002 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4004 Create packet stream for outside network
4006 :param out_if: Outside interface
4007 :param dst_ip: Destination IP address (Default use global NAT address)
4008 :param ttl: TTL of generated packets
4011 dst_ip = self.nat_addr
4014 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4015 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4016 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4020 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4021 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4022 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4026 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4027 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4028 ICMP(id=self.icmp_external_id, type='echo-reply'))
4033 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4035 Verify captured packets on outside network
4037 :param capture: Captured packets
4038 :param nat_ip: Translated IP address (Default use global NAT address)
4039 :param same_port: Sorce port number is not translated (Default False)
4040 :param packet_num: Expected number of packets (Default 3)
4043 nat_ip = self.nat_addr
4044 self.assertEqual(packet_num, len(capture))
4045 for packet in capture:
4047 self.assertEqual(packet[IP].src, nat_ip)
4048 if packet.haslayer(TCP):
4049 self.tcp_port_out = packet[TCP].sport
4050 elif packet.haslayer(UDP):
4051 self.udp_port_out = packet[UDP].sport
4053 self.icmp_external_id = packet[ICMP].id
4055 self.logger.error(ppp("Unexpected or invalid packet "
4056 "(outside network):", packet))
4059 def initiate_tcp_session(self, in_if, out_if):
4061 Initiates TCP session
4063 :param in_if: Inside interface
4064 :param out_if: Outside interface
4067 # SYN packet in->out
4068 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4069 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4070 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4073 self.pg_enable_capture(self.pg_interfaces)
4075 capture = out_if.get_capture(1)
4077 self.tcp_port_out = p[TCP].sport
4079 # SYN + ACK packet out->in
4080 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4081 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4082 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4084 out_if.add_stream(p)
4085 self.pg_enable_capture(self.pg_interfaces)
4087 in_if.get_capture(1)
4089 # ACK packet in->out
4090 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4091 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4092 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4095 self.pg_enable_capture(self.pg_interfaces)
4097 out_if.get_capture(1)
4100 self.logger.error("TCP 3 way handshake failed")
4103 def verify_ipfix_max_entries_per_user(self, data):
4105 Verify IPFIX maximum entries per user exceeded event
4107 :param data: Decoded IPFIX data records
4109 self.assertEqual(1, len(data))
4112 self.assertEqual(ord(record[230]), 13)
4113 # natQuotaExceededEvent
4114 self.assertEqual('\x03\x00\x00\x00', record[466])
4116 self.assertEqual('\xe8\x03\x00\x00', record[473])
4118 self.assertEqual(self.pg0.remote_ip4n, record[8])
4120 def test_deterministic_mode(self):
4121 """ NAT plugin run deterministic mode """
4122 in_addr = '172.16.255.0'
4123 out_addr = '172.17.255.50'
4124 in_addr_t = '172.16.255.20'
4125 in_addr_n = socket.inet_aton(in_addr)
4126 out_addr_n = socket.inet_aton(out_addr)
4127 in_addr_t_n = socket.inet_aton(in_addr_t)
4131 nat_config = self.vapi.nat_show_config()
4132 self.assertEqual(1, nat_config.deterministic)
4134 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4136 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4137 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4138 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4139 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4141 deterministic_mappings = self.vapi.nat_det_map_dump()
4142 self.assertEqual(len(deterministic_mappings), 1)
4143 dsm = deterministic_mappings[0]
4144 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4145 self.assertEqual(in_plen, dsm.in_plen)
4146 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4147 self.assertEqual(out_plen, dsm.out_plen)
4149 self.clear_nat_det()
4150 deterministic_mappings = self.vapi.nat_det_map_dump()
4151 self.assertEqual(len(deterministic_mappings), 0)
4153 def test_set_timeouts(self):
4154 """ Set deterministic NAT timeouts """
4155 timeouts_before = self.vapi.nat_det_get_timeouts()
4157 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4158 timeouts_before.tcp_established + 10,
4159 timeouts_before.tcp_transitory + 10,
4160 timeouts_before.icmp + 10)
4162 timeouts_after = self.vapi.nat_det_get_timeouts()
4164 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4165 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4166 self.assertNotEqual(timeouts_before.tcp_established,
4167 timeouts_after.tcp_established)
4168 self.assertNotEqual(timeouts_before.tcp_transitory,
4169 timeouts_after.tcp_transitory)
4171 def test_det_in(self):
4172 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4174 nat_ip = "10.0.0.10"
4176 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4178 socket.inet_aton(nat_ip),
4180 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4181 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4185 pkts = self.create_stream_in(self.pg0, self.pg1)
4186 self.pg0.add_stream(pkts)
4187 self.pg_enable_capture(self.pg_interfaces)
4189 capture = self.pg1.get_capture(len(pkts))
4190 self.verify_capture_out(capture, nat_ip)
4193 pkts = self.create_stream_out(self.pg1, nat_ip)
4194 self.pg1.add_stream(pkts)
4195 self.pg_enable_capture(self.pg_interfaces)
4197 capture = self.pg0.get_capture(len(pkts))
4198 self.verify_capture_in(capture, self.pg0)
4201 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4202 self.assertEqual(len(sessions), 3)
4206 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4207 self.assertEqual(s.in_port, self.tcp_port_in)
4208 self.assertEqual(s.out_port, self.tcp_port_out)
4209 self.assertEqual(s.ext_port, self.tcp_external_port)
4213 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4214 self.assertEqual(s.in_port, self.udp_port_in)
4215 self.assertEqual(s.out_port, self.udp_port_out)
4216 self.assertEqual(s.ext_port, self.udp_external_port)
4220 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4221 self.assertEqual(s.in_port, self.icmp_id_in)
4222 self.assertEqual(s.out_port, self.icmp_external_id)
4224 def test_multiple_users(self):
4225 """ Deterministic NAT multiple users """
4227 nat_ip = "10.0.0.10"
4229 external_port = 6303
4231 host0 = self.pg0.remote_hosts[0]
4232 host1 = self.pg0.remote_hosts[1]
4234 self.vapi.nat_det_add_del_map(host0.ip4n,
4236 socket.inet_aton(nat_ip),
4238 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4239 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4243 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4244 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4245 TCP(sport=port_in, dport=external_port))
4246 self.pg0.add_stream(p)
4247 self.pg_enable_capture(self.pg_interfaces)
4249 capture = self.pg1.get_capture(1)
4254 self.assertEqual(ip.src, nat_ip)
4255 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4256 self.assertEqual(tcp.dport, external_port)
4257 port_out0 = tcp.sport
4259 self.logger.error(ppp("Unexpected or invalid packet:", p))
4263 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4264 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4265 TCP(sport=port_in, dport=external_port))
4266 self.pg0.add_stream(p)
4267 self.pg_enable_capture(self.pg_interfaces)
4269 capture = self.pg1.get_capture(1)
4274 self.assertEqual(ip.src, nat_ip)
4275 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4276 self.assertEqual(tcp.dport, external_port)
4277 port_out1 = tcp.sport
4279 self.logger.error(ppp("Unexpected or invalid packet:", p))
4282 dms = self.vapi.nat_det_map_dump()
4283 self.assertEqual(1, len(dms))
4284 self.assertEqual(2, dms[0].ses_num)
4287 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4288 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4289 TCP(sport=external_port, dport=port_out0))
4290 self.pg1.add_stream(p)
4291 self.pg_enable_capture(self.pg_interfaces)
4293 capture = self.pg0.get_capture(1)
4298 self.assertEqual(ip.src, self.pg1.remote_ip4)
4299 self.assertEqual(ip.dst, host0.ip4)
4300 self.assertEqual(tcp.dport, port_in)
4301 self.assertEqual(tcp.sport, external_port)
4303 self.logger.error(ppp("Unexpected or invalid packet:", p))
4307 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4308 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4309 TCP(sport=external_port, dport=port_out1))
4310 self.pg1.add_stream(p)
4311 self.pg_enable_capture(self.pg_interfaces)
4313 capture = self.pg0.get_capture(1)
4318 self.assertEqual(ip.src, self.pg1.remote_ip4)
4319 self.assertEqual(ip.dst, host1.ip4)
4320 self.assertEqual(tcp.dport, port_in)
4321 self.assertEqual(tcp.sport, external_port)
4323 self.logger.error(ppp("Unexpected or invalid packet", p))
4326 # session close api test
4327 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4329 self.pg1.remote_ip4n,
4331 dms = self.vapi.nat_det_map_dump()
4332 self.assertEqual(dms[0].ses_num, 1)
4334 self.vapi.nat_det_close_session_in(host0.ip4n,
4336 self.pg1.remote_ip4n,
4338 dms = self.vapi.nat_det_map_dump()
4339 self.assertEqual(dms[0].ses_num, 0)
4341 def test_tcp_session_close_detection_in(self):
4342 """ Deterministic NAT TCP session close from inside network """
4343 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4345 socket.inet_aton(self.nat_addr),
4347 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4348 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4351 self.initiate_tcp_session(self.pg0, self.pg1)
4353 # close the session from inside
4355 # FIN packet in -> out
4356 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4357 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4358 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4360 self.pg0.add_stream(p)
4361 self.pg_enable_capture(self.pg_interfaces)
4363 self.pg1.get_capture(1)
4367 # ACK packet out -> in
4368 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4369 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4370 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4374 # FIN packet out -> in
4375 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4376 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4377 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4381 self.pg1.add_stream(pkts)
4382 self.pg_enable_capture(self.pg_interfaces)
4384 self.pg0.get_capture(2)
4386 # ACK packet in -> out
4387 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4388 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4389 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4391 self.pg0.add_stream(p)
4392 self.pg_enable_capture(self.pg_interfaces)
4394 self.pg1.get_capture(1)
4396 # Check if deterministic NAT44 closed the session
4397 dms = self.vapi.nat_det_map_dump()
4398 self.assertEqual(0, dms[0].ses_num)
4400 self.logger.error("TCP session termination failed")
4403 def test_tcp_session_close_detection_out(self):
4404 """ Deterministic NAT TCP session close from outside network """
4405 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4407 socket.inet_aton(self.nat_addr),
4409 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4410 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4413 self.initiate_tcp_session(self.pg0, self.pg1)
4415 # close the session from outside
4417 # FIN packet out -> in
4418 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4419 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4420 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4422 self.pg1.add_stream(p)
4423 self.pg_enable_capture(self.pg_interfaces)
4425 self.pg0.get_capture(1)
4429 # ACK packet in -> out
4430 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4431 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4432 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4436 # ACK packet in -> out
4437 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4438 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4439 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4443 self.pg0.add_stream(pkts)
4444 self.pg_enable_capture(self.pg_interfaces)
4446 self.pg1.get_capture(2)
4448 # ACK packet out -> in
4449 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4450 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4451 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4453 self.pg1.add_stream(p)
4454 self.pg_enable_capture(self.pg_interfaces)
4456 self.pg0.get_capture(1)
4458 # Check if deterministic NAT44 closed the session
4459 dms = self.vapi.nat_det_map_dump()
4460 self.assertEqual(0, dms[0].ses_num)
4462 self.logger.error("TCP session termination failed")
4465 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4466 def test_session_timeout(self):
4467 """ Deterministic NAT session timeouts """
4468 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4470 socket.inet_aton(self.nat_addr),
4472 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4473 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4476 self.initiate_tcp_session(self.pg0, self.pg1)
4477 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4478 pkts = self.create_stream_in(self.pg0, self.pg1)
4479 self.pg0.add_stream(pkts)
4480 self.pg_enable_capture(self.pg_interfaces)
4482 capture = self.pg1.get_capture(len(pkts))
4485 dms = self.vapi.nat_det_map_dump()
4486 self.assertEqual(0, dms[0].ses_num)
4488 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4489 def test_session_limit_per_user(self):
4490 """ Deterministic NAT maximum sessions per user limit """
4491 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4493 socket.inet_aton(self.nat_addr),
4495 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4496 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4498 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4499 src_address=self.pg2.local_ip4n,
4501 template_interval=10)
4502 self.vapi.nat_ipfix()
4505 for port in range(1025, 2025):
4506 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4507 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4508 UDP(sport=port, dport=port))
4511 self.pg0.add_stream(pkts)
4512 self.pg_enable_capture(self.pg_interfaces)
4514 capture = self.pg1.get_capture(len(pkts))
4516 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4517 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4518 UDP(sport=3001, dport=3002))
4519 self.pg0.add_stream(p)
4520 self.pg_enable_capture(self.pg_interfaces)
4522 capture = self.pg1.assert_nothing_captured()
4524 # verify ICMP error packet
4525 capture = self.pg0.get_capture(1)
4527 self.assertTrue(p.haslayer(ICMP))
4529 self.assertEqual(icmp.type, 3)
4530 self.assertEqual(icmp.code, 1)
4531 self.assertTrue(icmp.haslayer(IPerror))
4532 inner_ip = icmp[IPerror]
4533 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4534 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4536 dms = self.vapi.nat_det_map_dump()
4538 self.assertEqual(1000, dms[0].ses_num)
4540 # verify IPFIX logging
4541 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4543 capture = self.pg2.get_capture(2)
4544 ipfix = IPFIXDecoder()
4545 # first load template
4547 self.assertTrue(p.haslayer(IPFIX))
4548 if p.haslayer(Template):
4549 ipfix.add_template(p.getlayer(Template))
4550 # verify events in data set
4552 if p.haslayer(Data):
4553 data = ipfix.decode_data_set(p.getlayer(Set))
4554 self.verify_ipfix_max_entries_per_user(data)
4556 def clear_nat_det(self):
4558 Clear deterministic NAT configuration.
4560 self.vapi.nat_ipfix(enable=0)
4561 self.vapi.nat_det_set_timeouts()
4562 deterministic_mappings = self.vapi.nat_det_map_dump()
4563 for dsm in deterministic_mappings:
4564 self.vapi.nat_det_add_del_map(dsm.in_addr,
4570 interfaces = self.vapi.nat44_interface_dump()
4571 for intf in interfaces:
4572 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4577 super(TestDeterministicNAT, self).tearDown()
4578 if not self.vpp_dead:
4579 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4581 self.vapi.cli("show nat44 deterministic mappings"))
4583 self.vapi.cli("show nat44 deterministic timeouts"))
4585 self.vapi.cli("show nat44 deterministic sessions"))
4586 self.clear_nat_det()
4589 class TestNAT64(MethodHolder):
4590 """ NAT64 Test Cases """
4593 def setUpConstants(cls):
4594 super(TestNAT64, cls).setUpConstants()
4595 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4596 "nat64 st hash buckets 256", "}"])
4599 def setUpClass(cls):
4600 super(TestNAT64, cls).setUpClass()
4603 cls.tcp_port_in = 6303
4604 cls.tcp_port_out = 6303
4605 cls.udp_port_in = 6304
4606 cls.udp_port_out = 6304
4607 cls.icmp_id_in = 6305
4608 cls.icmp_id_out = 6305
4609 cls.nat_addr = '10.0.0.3'
4610 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4612 cls.vrf1_nat_addr = '10.0.10.3'
4613 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4615 cls.ipfix_src_port = 4739
4616 cls.ipfix_domain_id = 1
4618 cls.create_pg_interfaces(range(5))
4619 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4620 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4621 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4623 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4625 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4627 cls.pg0.generate_remote_hosts(2)
4629 for i in cls.ip6_interfaces:
4632 i.configure_ipv6_neighbors()
4634 for i in cls.ip4_interfaces:
4640 cls.pg3.config_ip4()
4641 cls.pg3.resolve_arp()
4642 cls.pg3.config_ip6()
4643 cls.pg3.configure_ipv6_neighbors()
4646 super(TestNAT64, cls).tearDownClass()
4649 def test_pool(self):
4650 """ Add/delete address to NAT64 pool """
4651 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4653 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4655 addresses = self.vapi.nat64_pool_addr_dump()
4656 self.assertEqual(len(addresses), 1)
4657 self.assertEqual(addresses[0].address, nat_addr)
4659 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4661 addresses = self.vapi.nat64_pool_addr_dump()
4662 self.assertEqual(len(addresses), 0)
4664 def test_interface(self):
4665 """ Enable/disable NAT64 feature on the interface """
4666 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4667 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4669 interfaces = self.vapi.nat64_interface_dump()
4670 self.assertEqual(len(interfaces), 2)
4673 for intf in interfaces:
4674 if intf.sw_if_index == self.pg0.sw_if_index:
4675 self.assertEqual(intf.is_inside, 1)
4677 elif intf.sw_if_index == self.pg1.sw_if_index:
4678 self.assertEqual(intf.is_inside, 0)
4680 self.assertTrue(pg0_found)
4681 self.assertTrue(pg1_found)
4683 features = self.vapi.cli("show interface features pg0")
4684 self.assertNotEqual(features.find('nat64-in2out'), -1)
4685 features = self.vapi.cli("show interface features pg1")
4686 self.assertNotEqual(features.find('nat64-out2in'), -1)
4688 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4689 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4691 interfaces = self.vapi.nat64_interface_dump()
4692 self.assertEqual(len(interfaces), 0)
4694 def test_static_bib(self):
4695 """ Add/delete static BIB entry """
4696 in_addr = socket.inet_pton(socket.AF_INET6,
4697 '2001:db8:85a3::8a2e:370:7334')
4698 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4701 proto = IP_PROTOS.tcp
4703 self.vapi.nat64_add_del_static_bib(in_addr,
4708 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4713 self.assertEqual(bibe.i_addr, in_addr)
4714 self.assertEqual(bibe.o_addr, out_addr)
4715 self.assertEqual(bibe.i_port, in_port)
4716 self.assertEqual(bibe.o_port, out_port)
4717 self.assertEqual(static_bib_num, 1)
4719 self.vapi.nat64_add_del_static_bib(in_addr,
4725 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4730 self.assertEqual(static_bib_num, 0)
4732 def test_set_timeouts(self):
4733 """ Set NAT64 timeouts """
4734 # verify default values
4735 timeouts = self.vapi.nat64_get_timeouts()
4736 self.assertEqual(timeouts.udp, 300)
4737 self.assertEqual(timeouts.icmp, 60)
4738 self.assertEqual(timeouts.tcp_trans, 240)
4739 self.assertEqual(timeouts.tcp_est, 7440)
4740 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4742 # set and verify custom values
4743 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4744 tcp_est=7450, tcp_incoming_syn=10)
4745 timeouts = self.vapi.nat64_get_timeouts()
4746 self.assertEqual(timeouts.udp, 200)
4747 self.assertEqual(timeouts.icmp, 30)
4748 self.assertEqual(timeouts.tcp_trans, 250)
4749 self.assertEqual(timeouts.tcp_est, 7450)
4750 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4752 def test_dynamic(self):
4753 """ NAT64 dynamic translation test """
4754 self.tcp_port_in = 6303
4755 self.udp_port_in = 6304
4756 self.icmp_id_in = 6305
4758 ses_num_start = self.nat64_get_ses_num()
4760 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4762 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4763 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4766 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4767 self.pg0.add_stream(pkts)
4768 self.pg_enable_capture(self.pg_interfaces)
4770 capture = self.pg1.get_capture(len(pkts))
4771 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4772 dst_ip=self.pg1.remote_ip4)
4775 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4776 self.pg1.add_stream(pkts)
4777 self.pg_enable_capture(self.pg_interfaces)
4779 capture = self.pg0.get_capture(len(pkts))
4780 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4781 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4784 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4785 self.pg0.add_stream(pkts)
4786 self.pg_enable_capture(self.pg_interfaces)
4788 capture = self.pg1.get_capture(len(pkts))
4789 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4790 dst_ip=self.pg1.remote_ip4)
4793 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4794 self.pg1.add_stream(pkts)
4795 self.pg_enable_capture(self.pg_interfaces)
4797 capture = self.pg0.get_capture(len(pkts))
4798 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4800 ses_num_end = self.nat64_get_ses_num()
4802 self.assertEqual(ses_num_end - ses_num_start, 3)
4804 # tenant with specific VRF
4805 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4806 self.vrf1_nat_addr_n,
4807 vrf_id=self.vrf1_id)
4808 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4810 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4811 self.pg2.add_stream(pkts)
4812 self.pg_enable_capture(self.pg_interfaces)
4814 capture = self.pg1.get_capture(len(pkts))
4815 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4816 dst_ip=self.pg1.remote_ip4)
4818 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4819 self.pg1.add_stream(pkts)
4820 self.pg_enable_capture(self.pg_interfaces)
4822 capture = self.pg2.get_capture(len(pkts))
4823 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4825 def test_static(self):
4826 """ NAT64 static translation test """
4827 self.tcp_port_in = 60303
4828 self.udp_port_in = 60304
4829 self.icmp_id_in = 60305
4830 self.tcp_port_out = 60303
4831 self.udp_port_out = 60304
4832 self.icmp_id_out = 60305
4834 ses_num_start = self.nat64_get_ses_num()
4836 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4838 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4839 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4841 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4846 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4851 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4858 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4859 self.pg0.add_stream(pkts)
4860 self.pg_enable_capture(self.pg_interfaces)
4862 capture = self.pg1.get_capture(len(pkts))
4863 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4864 dst_ip=self.pg1.remote_ip4, same_port=True)
4867 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4868 self.pg1.add_stream(pkts)
4869 self.pg_enable_capture(self.pg_interfaces)
4871 capture = self.pg0.get_capture(len(pkts))
4872 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4873 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4875 ses_num_end = self.nat64_get_ses_num()
4877 self.assertEqual(ses_num_end - ses_num_start, 3)
4879 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4880 def test_session_timeout(self):
4881 """ NAT64 session timeout """
4882 self.icmp_id_in = 1234
4883 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4885 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4886 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4887 self.vapi.nat64_set_timeouts(icmp=5)
4889 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4890 self.pg0.add_stream(pkts)
4891 self.pg_enable_capture(self.pg_interfaces)
4893 capture = self.pg1.get_capture(len(pkts))
4895 ses_num_before_timeout = self.nat64_get_ses_num()
4899 # ICMP session after timeout
4900 ses_num_after_timeout = self.nat64_get_ses_num()
4901 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4903 def test_icmp_error(self):
4904 """ NAT64 ICMP Error message translation """
4905 self.tcp_port_in = 6303
4906 self.udp_port_in = 6304
4907 self.icmp_id_in = 6305
4909 ses_num_start = self.nat64_get_ses_num()
4911 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4913 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4914 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4916 # send some packets to create sessions
4917 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4918 self.pg0.add_stream(pkts)
4919 self.pg_enable_capture(self.pg_interfaces)
4921 capture_ip4 = self.pg1.get_capture(len(pkts))
4922 self.verify_capture_out(capture_ip4,
4923 nat_ip=self.nat_addr,
4924 dst_ip=self.pg1.remote_ip4)
4926 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4927 self.pg1.add_stream(pkts)
4928 self.pg_enable_capture(self.pg_interfaces)
4930 capture_ip6 = self.pg0.get_capture(len(pkts))
4931 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4932 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4933 self.pg0.remote_ip6)
4936 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4937 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4938 ICMPv6DestUnreach(code=1) /
4939 packet[IPv6] for packet in capture_ip6]
4940 self.pg0.add_stream(pkts)
4941 self.pg_enable_capture(self.pg_interfaces)
4943 capture = self.pg1.get_capture(len(pkts))
4944 for packet in capture:
4946 self.assertEqual(packet[IP].src, self.nat_addr)
4947 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4948 self.assertEqual(packet[ICMP].type, 3)
4949 self.assertEqual(packet[ICMP].code, 13)
4950 inner = packet[IPerror]
4951 self.assertEqual(inner.src, self.pg1.remote_ip4)
4952 self.assertEqual(inner.dst, self.nat_addr)
4953 self.check_icmp_checksum(packet)
4954 if inner.haslayer(TCPerror):
4955 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4956 elif inner.haslayer(UDPerror):
4957 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4959 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4961 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4965 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4966 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4967 ICMP(type=3, code=13) /
4968 packet[IP] for packet in capture_ip4]
4969 self.pg1.add_stream(pkts)
4970 self.pg_enable_capture(self.pg_interfaces)
4972 capture = self.pg0.get_capture(len(pkts))
4973 for packet in capture:
4975 self.assertEqual(packet[IPv6].src, ip.src)
4976 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4977 icmp = packet[ICMPv6DestUnreach]
4978 self.assertEqual(icmp.code, 1)
4979 inner = icmp[IPerror6]
4980 self.assertEqual(inner.src, self.pg0.remote_ip6)
4981 self.assertEqual(inner.dst, ip.src)
4982 self.check_icmpv6_checksum(packet)
4983 if inner.haslayer(TCPerror):
4984 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4985 elif inner.haslayer(UDPerror):
4986 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4988 self.assertEqual(inner[ICMPv6EchoRequest].id,
4991 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4994 def test_hairpinning(self):
4995 """ NAT64 hairpinning """
4997 client = self.pg0.remote_hosts[0]
4998 server = self.pg0.remote_hosts[1]
4999 server_tcp_in_port = 22
5000 server_tcp_out_port = 4022
5001 server_udp_in_port = 23
5002 server_udp_out_port = 4023
5003 client_tcp_in_port = 1234
5004 client_udp_in_port = 1235
5005 client_tcp_out_port = 0
5006 client_udp_out_port = 0
5007 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5008 nat_addr_ip6 = ip.src
5010 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5012 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5013 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5015 self.vapi.nat64_add_del_static_bib(server.ip6n,
5018 server_tcp_out_port,
5020 self.vapi.nat64_add_del_static_bib(server.ip6n,
5023 server_udp_out_port,
5028 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5029 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5030 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5032 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5033 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5034 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5036 self.pg0.add_stream(pkts)
5037 self.pg_enable_capture(self.pg_interfaces)
5039 capture = self.pg0.get_capture(len(pkts))
5040 for packet in capture:
5042 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5043 self.assertEqual(packet[IPv6].dst, server.ip6)
5044 if packet.haslayer(TCP):
5045 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5046 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5047 self.check_tcp_checksum(packet)
5048 client_tcp_out_port = packet[TCP].sport
5050 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5051 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5052 self.check_udp_checksum(packet)
5053 client_udp_out_port = packet[UDP].sport
5055 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5060 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5061 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5062 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5064 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5065 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5066 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5068 self.pg0.add_stream(pkts)
5069 self.pg_enable_capture(self.pg_interfaces)
5071 capture = self.pg0.get_capture(len(pkts))
5072 for packet in capture:
5074 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5075 self.assertEqual(packet[IPv6].dst, client.ip6)
5076 if packet.haslayer(TCP):
5077 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5078 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5079 self.check_tcp_checksum(packet)
5081 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5082 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5083 self.check_udp_checksum(packet)
5085 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5090 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5091 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5092 ICMPv6DestUnreach(code=1) /
5093 packet[IPv6] for packet in capture]
5094 self.pg0.add_stream(pkts)
5095 self.pg_enable_capture(self.pg_interfaces)
5097 capture = self.pg0.get_capture(len(pkts))
5098 for packet in capture:
5100 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5101 self.assertEqual(packet[IPv6].dst, server.ip6)
5102 icmp = packet[ICMPv6DestUnreach]
5103 self.assertEqual(icmp.code, 1)
5104 inner = icmp[IPerror6]
5105 self.assertEqual(inner.src, server.ip6)
5106 self.assertEqual(inner.dst, nat_addr_ip6)
5107 self.check_icmpv6_checksum(packet)
5108 if inner.haslayer(TCPerror):
5109 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5110 self.assertEqual(inner[TCPerror].dport,
5111 client_tcp_out_port)
5113 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5114 self.assertEqual(inner[UDPerror].dport,
5115 client_udp_out_port)
5117 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5120 def test_prefix(self):
5121 """ NAT64 Network-Specific Prefix """
5123 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5125 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5126 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5127 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5128 self.vrf1_nat_addr_n,
5129 vrf_id=self.vrf1_id)
5130 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5133 global_pref64 = "2001:db8::"
5134 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5135 global_pref64_len = 32
5136 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5138 prefix = self.vapi.nat64_prefix_dump()
5139 self.assertEqual(len(prefix), 1)
5140 self.assertEqual(prefix[0].prefix, global_pref64_n)
5141 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5142 self.assertEqual(prefix[0].vrf_id, 0)
5144 # Add tenant specific prefix
5145 vrf1_pref64 = "2001:db8:122:300::"
5146 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5147 vrf1_pref64_len = 56
5148 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5150 vrf_id=self.vrf1_id)
5151 prefix = self.vapi.nat64_prefix_dump()
5152 self.assertEqual(len(prefix), 2)
5155 pkts = self.create_stream_in_ip6(self.pg0,
5158 plen=global_pref64_len)
5159 self.pg0.add_stream(pkts)
5160 self.pg_enable_capture(self.pg_interfaces)
5162 capture = self.pg1.get_capture(len(pkts))
5163 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5164 dst_ip=self.pg1.remote_ip4)
5166 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5167 self.pg1.add_stream(pkts)
5168 self.pg_enable_capture(self.pg_interfaces)
5170 capture = self.pg0.get_capture(len(pkts))
5171 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5174 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5176 # Tenant specific prefix
5177 pkts = self.create_stream_in_ip6(self.pg2,
5180 plen=vrf1_pref64_len)
5181 self.pg2.add_stream(pkts)
5182 self.pg_enable_capture(self.pg_interfaces)
5184 capture = self.pg1.get_capture(len(pkts))
5185 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5186 dst_ip=self.pg1.remote_ip4)
5188 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5189 self.pg1.add_stream(pkts)
5190 self.pg_enable_capture(self.pg_interfaces)
5192 capture = self.pg2.get_capture(len(pkts))
5193 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5196 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5198 def test_unknown_proto(self):
5199 """ NAT64 translate packet with unknown protocol """
5201 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5203 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5204 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5205 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5208 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5209 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5210 TCP(sport=self.tcp_port_in, dport=20))
5211 self.pg0.add_stream(p)
5212 self.pg_enable_capture(self.pg_interfaces)
5214 p = self.pg1.get_capture(1)
5216 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5217 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5219 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5220 TCP(sport=1234, dport=1234))
5221 self.pg0.add_stream(p)
5222 self.pg_enable_capture(self.pg_interfaces)
5224 p = self.pg1.get_capture(1)
5227 self.assertEqual(packet[IP].src, self.nat_addr)
5228 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5229 self.assertTrue(packet.haslayer(GRE))
5230 self.check_ip_checksum(packet)
5232 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5236 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5237 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5239 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5240 TCP(sport=1234, dport=1234))
5241 self.pg1.add_stream(p)
5242 self.pg_enable_capture(self.pg_interfaces)
5244 p = self.pg0.get_capture(1)
5247 self.assertEqual(packet[IPv6].src, remote_ip6)
5248 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5249 self.assertEqual(packet[IPv6].nh, 47)
5251 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5254 def test_hairpinning_unknown_proto(self):
5255 """ NAT64 translate packet with unknown protocol - hairpinning """
5257 client = self.pg0.remote_hosts[0]
5258 server = self.pg0.remote_hosts[1]
5259 server_tcp_in_port = 22
5260 server_tcp_out_port = 4022
5261 client_tcp_in_port = 1234
5262 client_tcp_out_port = 1235
5263 server_nat_ip = "10.0.0.100"
5264 client_nat_ip = "10.0.0.110"
5265 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5266 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5267 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5268 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5270 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5272 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5273 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5275 self.vapi.nat64_add_del_static_bib(server.ip6n,
5278 server_tcp_out_port,
5281 self.vapi.nat64_add_del_static_bib(server.ip6n,
5287 self.vapi.nat64_add_del_static_bib(client.ip6n,
5290 client_tcp_out_port,
5294 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5295 IPv6(src=client.ip6, dst=server_nat_ip6) /
5296 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5297 self.pg0.add_stream(p)
5298 self.pg_enable_capture(self.pg_interfaces)
5300 p = self.pg0.get_capture(1)
5302 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5303 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5305 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5306 TCP(sport=1234, dport=1234))
5307 self.pg0.add_stream(p)
5308 self.pg_enable_capture(self.pg_interfaces)
5310 p = self.pg0.get_capture(1)
5313 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5314 self.assertEqual(packet[IPv6].dst, server.ip6)
5315 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5317 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5321 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5322 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5324 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5325 TCP(sport=1234, dport=1234))
5326 self.pg0.add_stream(p)
5327 self.pg_enable_capture(self.pg_interfaces)
5329 p = self.pg0.get_capture(1)
5332 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5333 self.assertEqual(packet[IPv6].dst, client.ip6)
5334 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5336 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5339 def test_one_armed_nat64(self):
5340 """ One armed NAT64 """
5342 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5346 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5348 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5349 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5352 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5353 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5354 TCP(sport=12345, dport=80))
5355 self.pg3.add_stream(p)
5356 self.pg_enable_capture(self.pg_interfaces)
5358 capture = self.pg3.get_capture(1)
5363 self.assertEqual(ip.src, self.nat_addr)
5364 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5365 self.assertNotEqual(tcp.sport, 12345)
5366 external_port = tcp.sport
5367 self.assertEqual(tcp.dport, 80)
5368 self.check_tcp_checksum(p)
5369 self.check_ip_checksum(p)
5371 self.logger.error(ppp("Unexpected or invalid packet:", p))
5375 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5376 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5377 TCP(sport=80, dport=external_port))
5378 self.pg3.add_stream(p)
5379 self.pg_enable_capture(self.pg_interfaces)
5381 capture = self.pg3.get_capture(1)
5386 self.assertEqual(ip.src, remote_host_ip6)
5387 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5388 self.assertEqual(tcp.sport, 80)
5389 self.assertEqual(tcp.dport, 12345)
5390 self.check_tcp_checksum(p)
5392 self.logger.error(ppp("Unexpected or invalid packet:", p))
5395 def test_frag_in_order(self):
5396 """ NAT64 translate fragments arriving in order """
5397 self.tcp_port_in = random.randint(1025, 65535)
5399 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5401 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5402 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5404 reass = self.vapi.nat_reass_dump()
5405 reass_n_start = len(reass)
5409 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5410 self.tcp_port_in, 20, data)
5411 self.pg0.add_stream(pkts)
5412 self.pg_enable_capture(self.pg_interfaces)
5414 frags = self.pg1.get_capture(len(pkts))
5415 p = self.reass_frags_and_verify(frags,
5417 self.pg1.remote_ip4)
5418 self.assertEqual(p[TCP].dport, 20)
5419 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5420 self.tcp_port_out = p[TCP].sport
5421 self.assertEqual(data, p[Raw].load)
5424 data = "A" * 4 + "b" * 16 + "C" * 3
5425 pkts = self.create_stream_frag(self.pg1,
5430 self.pg1.add_stream(pkts)
5431 self.pg_enable_capture(self.pg_interfaces)
5433 frags = self.pg0.get_capture(len(pkts))
5434 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5435 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5436 self.assertEqual(p[TCP].sport, 20)
5437 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5438 self.assertEqual(data, p[Raw].load)
5440 reass = self.vapi.nat_reass_dump()
5441 reass_n_end = len(reass)
5443 self.assertEqual(reass_n_end - reass_n_start, 2)
5445 def test_reass_hairpinning(self):
5446 """ NAT64 fragments hairpinning """
5448 client = self.pg0.remote_hosts[0]
5449 server = self.pg0.remote_hosts[1]
5450 server_in_port = random.randint(1025, 65535)
5451 server_out_port = random.randint(1025, 65535)
5452 client_in_port = random.randint(1025, 65535)
5453 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5454 nat_addr_ip6 = ip.src
5456 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5458 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5459 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5461 # add static BIB entry for server
5462 self.vapi.nat64_add_del_static_bib(server.ip6n,
5468 # send packet from host to server
5469 pkts = self.create_stream_frag_ip6(self.pg0,
5474 self.pg0.add_stream(pkts)
5475 self.pg_enable_capture(self.pg_interfaces)
5477 frags = self.pg0.get_capture(len(pkts))
5478 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5479 self.assertNotEqual(p[TCP].sport, client_in_port)
5480 self.assertEqual(p[TCP].dport, server_in_port)
5481 self.assertEqual(data, p[Raw].load)
5483 def test_frag_out_of_order(self):
5484 """ NAT64 translate fragments arriving out of order """
5485 self.tcp_port_in = random.randint(1025, 65535)
5487 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5489 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5490 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5494 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5495 self.tcp_port_in, 20, data)
5497 self.pg0.add_stream(pkts)
5498 self.pg_enable_capture(self.pg_interfaces)
5500 frags = self.pg1.get_capture(len(pkts))
5501 p = self.reass_frags_and_verify(frags,
5503 self.pg1.remote_ip4)
5504 self.assertEqual(p[TCP].dport, 20)
5505 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5506 self.tcp_port_out = p[TCP].sport
5507 self.assertEqual(data, p[Raw].load)
5510 data = "A" * 4 + "B" * 16 + "C" * 3
5511 pkts = self.create_stream_frag(self.pg1,
5517 self.pg1.add_stream(pkts)
5518 self.pg_enable_capture(self.pg_interfaces)
5520 frags = self.pg0.get_capture(len(pkts))
5521 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5522 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5523 self.assertEqual(p[TCP].sport, 20)
5524 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5525 self.assertEqual(data, p[Raw].load)
5527 def test_interface_addr(self):
5528 """ Acquire NAT64 pool addresses from interface """
5529 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5531 # no address in NAT64 pool
5532 adresses = self.vapi.nat44_address_dump()
5533 self.assertEqual(0, len(adresses))
5535 # configure interface address and check NAT64 address pool
5536 self.pg4.config_ip4()
5537 addresses = self.vapi.nat64_pool_addr_dump()
5538 self.assertEqual(len(addresses), 1)
5539 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5541 # remove interface address and check NAT64 address pool
5542 self.pg4.unconfig_ip4()
5543 addresses = self.vapi.nat64_pool_addr_dump()
5544 self.assertEqual(0, len(adresses))
5546 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5547 def test_ipfix_max_bibs_sessions(self):
5548 """ IPFIX logging maximum session and BIB entries exceeded """
5551 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5555 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5557 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5558 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5562 for i in range(0, max_bibs):
5563 src = "fd01:aa::%x" % (i)
5564 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5565 IPv6(src=src, dst=remote_host_ip6) /
5566 TCP(sport=12345, dport=80))
5568 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5569 IPv6(src=src, dst=remote_host_ip6) /
5570 TCP(sport=12345, dport=22))
5572 self.pg0.add_stream(pkts)
5573 self.pg_enable_capture(self.pg_interfaces)
5575 self.pg1.get_capture(max_sessions)
5577 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5578 src_address=self.pg3.local_ip4n,
5580 template_interval=10)
5581 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5582 src_port=self.ipfix_src_port)
5584 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5585 IPv6(src=src, dst=remote_host_ip6) /
5586 TCP(sport=12345, dport=25))
5587 self.pg0.add_stream(p)
5588 self.pg_enable_capture(self.pg_interfaces)
5590 self.pg1.get_capture(0)
5591 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5592 capture = self.pg3.get_capture(9)
5593 ipfix = IPFIXDecoder()
5594 # first load template
5596 self.assertTrue(p.haslayer(IPFIX))
5597 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5598 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5599 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5600 self.assertEqual(p[UDP].dport, 4739)
5601 self.assertEqual(p[IPFIX].observationDomainID,
5602 self.ipfix_domain_id)
5603 if p.haslayer(Template):
5604 ipfix.add_template(p.getlayer(Template))
5605 # verify events in data set
5607 if p.haslayer(Data):
5608 data = ipfix.decode_data_set(p.getlayer(Set))
5609 self.verify_ipfix_max_sessions(data, max_sessions)
5611 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5612 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5613 TCP(sport=12345, dport=80))
5614 self.pg0.add_stream(p)
5615 self.pg_enable_capture(self.pg_interfaces)
5617 self.pg1.get_capture(0)
5618 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5619 capture = self.pg3.get_capture(1)
5620 # verify events in data set
5622 self.assertTrue(p.haslayer(IPFIX))
5623 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5624 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5625 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5626 self.assertEqual(p[UDP].dport, 4739)
5627 self.assertEqual(p[IPFIX].observationDomainID,
5628 self.ipfix_domain_id)
5629 if p.haslayer(Data):
5630 data = ipfix.decode_data_set(p.getlayer(Set))
5631 self.verify_ipfix_max_bibs(data, max_bibs)
5633 def test_ipfix_max_frags(self):
5634 """ IPFIX logging maximum fragments pending reassembly exceeded """
5635 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5637 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5638 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5639 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5640 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5641 src_address=self.pg3.local_ip4n,
5643 template_interval=10)
5644 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5645 src_port=self.ipfix_src_port)
5648 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5649 self.tcp_port_in, 20, data)
5650 self.pg0.add_stream(pkts[-1])
5651 self.pg_enable_capture(self.pg_interfaces)
5653 self.pg1.get_capture(0)
5654 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5655 capture = self.pg3.get_capture(9)
5656 ipfix = IPFIXDecoder()
5657 # first load template
5659 self.assertTrue(p.haslayer(IPFIX))
5660 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5661 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5662 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5663 self.assertEqual(p[UDP].dport, 4739)
5664 self.assertEqual(p[IPFIX].observationDomainID,
5665 self.ipfix_domain_id)
5666 if p.haslayer(Template):
5667 ipfix.add_template(p.getlayer(Template))
5668 # verify events in data set
5670 if p.haslayer(Data):
5671 data = ipfix.decode_data_set(p.getlayer(Set))
5672 self.verify_ipfix_max_fragments_ip6(data, 0,
5673 self.pg0.remote_ip6n)
5675 def test_ipfix_bib_ses(self):
5676 """ IPFIX logging NAT64 BIB/session create and delete events """
5677 self.tcp_port_in = random.randint(1025, 65535)
5678 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5682 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5684 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5685 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5686 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5687 src_address=self.pg3.local_ip4n,
5689 template_interval=10)
5690 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5691 src_port=self.ipfix_src_port)
5694 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5695 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5696 TCP(sport=self.tcp_port_in, dport=25))
5697 self.pg0.add_stream(p)
5698 self.pg_enable_capture(self.pg_interfaces)
5700 p = self.pg1.get_capture(1)
5701 self.tcp_port_out = p[0][TCP].sport
5702 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5703 capture = self.pg3.get_capture(10)
5704 ipfix = IPFIXDecoder()
5705 # first load template
5707 self.assertTrue(p.haslayer(IPFIX))
5708 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5709 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5710 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5711 self.assertEqual(p[UDP].dport, 4739)
5712 self.assertEqual(p[IPFIX].observationDomainID,
5713 self.ipfix_domain_id)
5714 if p.haslayer(Template):
5715 ipfix.add_template(p.getlayer(Template))
5716 # verify events in data set
5718 if p.haslayer(Data):
5719 data = ipfix.decode_data_set(p.getlayer(Set))
5720 if ord(data[0][230]) == 10:
5721 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5722 elif ord(data[0][230]) == 6:
5723 self.verify_ipfix_nat64_ses(data,
5725 self.pg0.remote_ip6n,
5726 self.pg1.remote_ip4,
5729 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5732 self.pg_enable_capture(self.pg_interfaces)
5733 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5736 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5737 capture = self.pg3.get_capture(2)
5738 # verify events in data set
5740 self.assertTrue(p.haslayer(IPFIX))
5741 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5742 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5743 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5744 self.assertEqual(p[UDP].dport, 4739)
5745 self.assertEqual(p[IPFIX].observationDomainID,
5746 self.ipfix_domain_id)
5747 if p.haslayer(Data):
5748 data = ipfix.decode_data_set(p.getlayer(Set))
5749 if ord(data[0][230]) == 11:
5750 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5751 elif ord(data[0][230]) == 7:
5752 self.verify_ipfix_nat64_ses(data,
5754 self.pg0.remote_ip6n,
5755 self.pg1.remote_ip4,
5758 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5760 def nat64_get_ses_num(self):
5762 Return number of active NAT64 sessions.
5764 st = self.vapi.nat64_st_dump()
5767 def clear_nat64(self):
5769 Clear NAT64 configuration.
5771 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5772 domain_id=self.ipfix_domain_id)
5773 self.ipfix_src_port = 4739
5774 self.ipfix_domain_id = 1
5776 self.vapi.nat64_set_timeouts()
5778 interfaces = self.vapi.nat64_interface_dump()
5779 for intf in interfaces:
5780 if intf.is_inside > 1:
5781 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5784 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5788 bib = self.vapi.nat64_bib_dump(255)
5791 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5799 adresses = self.vapi.nat64_pool_addr_dump()
5800 for addr in adresses:
5801 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5806 prefixes = self.vapi.nat64_prefix_dump()
5807 for prefix in prefixes:
5808 self.vapi.nat64_add_del_prefix(prefix.prefix,
5810 vrf_id=prefix.vrf_id,
5814 super(TestNAT64, self).tearDown()
5815 if not self.vpp_dead:
5816 self.logger.info(self.vapi.cli("show nat64 pool"))
5817 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5818 self.logger.info(self.vapi.cli("show nat64 prefix"))
5819 self.logger.info(self.vapi.cli("show nat64 bib all"))
5820 self.logger.info(self.vapi.cli("show nat64 session table all"))
5821 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5825 class TestDSlite(MethodHolder):
5826 """ DS-Lite Test Cases """
5829 def setUpClass(cls):
5830 super(TestDSlite, cls).setUpClass()
5833 cls.nat_addr = '10.0.0.3'
5834 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5836 cls.create_pg_interfaces(range(2))
5838 cls.pg0.config_ip4()
5839 cls.pg0.resolve_arp()
5841 cls.pg1.config_ip6()
5842 cls.pg1.generate_remote_hosts(2)
5843 cls.pg1.configure_ipv6_neighbors()
5846 super(TestDSlite, cls).tearDownClass()
5849 def test_dslite(self):
5850 """ Test DS-Lite """
5851 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5853 aftr_ip4 = '192.0.0.1'
5854 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5855 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5856 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5857 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5860 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5861 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5862 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5863 UDP(sport=20000, dport=10000))
5864 self.pg1.add_stream(p)
5865 self.pg_enable_capture(self.pg_interfaces)
5867 capture = self.pg0.get_capture(1)
5868 capture = capture[0]
5869 self.assertFalse(capture.haslayer(IPv6))
5870 self.assertEqual(capture[IP].src, self.nat_addr)
5871 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5872 self.assertNotEqual(capture[UDP].sport, 20000)
5873 self.assertEqual(capture[UDP].dport, 10000)
5874 self.check_ip_checksum(capture)
5875 out_port = capture[UDP].sport
5877 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5878 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5879 UDP(sport=10000, dport=out_port))
5880 self.pg0.add_stream(p)
5881 self.pg_enable_capture(self.pg_interfaces)
5883 capture = self.pg1.get_capture(1)
5884 capture = capture[0]
5885 self.assertEqual(capture[IPv6].src, aftr_ip6)
5886 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5887 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5888 self.assertEqual(capture[IP].dst, '192.168.1.1')
5889 self.assertEqual(capture[UDP].sport, 10000)
5890 self.assertEqual(capture[UDP].dport, 20000)
5891 self.check_ip_checksum(capture)
5894 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5895 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5896 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5897 TCP(sport=20001, dport=10001))
5898 self.pg1.add_stream(p)
5899 self.pg_enable_capture(self.pg_interfaces)
5901 capture = self.pg0.get_capture(1)
5902 capture = capture[0]
5903 self.assertFalse(capture.haslayer(IPv6))
5904 self.assertEqual(capture[IP].src, self.nat_addr)
5905 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5906 self.assertNotEqual(capture[TCP].sport, 20001)
5907 self.assertEqual(capture[TCP].dport, 10001)
5908 self.check_ip_checksum(capture)
5909 self.check_tcp_checksum(capture)
5910 out_port = capture[TCP].sport
5912 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5913 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5914 TCP(sport=10001, dport=out_port))
5915 self.pg0.add_stream(p)
5916 self.pg_enable_capture(self.pg_interfaces)
5918 capture = self.pg1.get_capture(1)
5919 capture = capture[0]
5920 self.assertEqual(capture[IPv6].src, aftr_ip6)
5921 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5922 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5923 self.assertEqual(capture[IP].dst, '192.168.1.1')
5924 self.assertEqual(capture[TCP].sport, 10001)
5925 self.assertEqual(capture[TCP].dport, 20001)
5926 self.check_ip_checksum(capture)
5927 self.check_tcp_checksum(capture)
5930 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5931 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5932 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5933 ICMP(id=4000, type='echo-request'))
5934 self.pg1.add_stream(p)
5935 self.pg_enable_capture(self.pg_interfaces)
5937 capture = self.pg0.get_capture(1)
5938 capture = capture[0]
5939 self.assertFalse(capture.haslayer(IPv6))
5940 self.assertEqual(capture[IP].src, self.nat_addr)
5941 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5942 self.assertNotEqual(capture[ICMP].id, 4000)
5943 self.check_ip_checksum(capture)
5944 self.check_icmp_checksum(capture)
5945 out_id = capture[ICMP].id
5947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5948 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5949 ICMP(id=out_id, type='echo-reply'))
5950 self.pg0.add_stream(p)
5951 self.pg_enable_capture(self.pg_interfaces)
5953 capture = self.pg1.get_capture(1)
5954 capture = capture[0]
5955 self.assertEqual(capture[IPv6].src, aftr_ip6)
5956 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5957 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5958 self.assertEqual(capture[IP].dst, '192.168.1.1')
5959 self.assertEqual(capture[ICMP].id, 4000)
5960 self.check_ip_checksum(capture)
5961 self.check_icmp_checksum(capture)
5963 # ping DS-Lite AFTR tunnel endpoint address
5964 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5965 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5966 ICMPv6EchoRequest())
5967 self.pg1.add_stream(p)
5968 self.pg_enable_capture(self.pg_interfaces)
5970 capture = self.pg1.get_capture(1)
5971 self.assertEqual(1, len(capture))
5972 capture = capture[0]
5973 self.assertEqual(capture[IPv6].src, aftr_ip6)
5974 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5975 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5978 super(TestDSlite, self).tearDown()
5979 if not self.vpp_dead:
5980 self.logger.info(self.vapi.cli("show dslite pool"))
5982 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5983 self.logger.info(self.vapi.cli("show dslite sessions"))
5986 class TestDSliteCE(MethodHolder):
5987 """ DS-Lite CE Test Cases """
5990 def setUpConstants(cls):
5991 super(TestDSliteCE, cls).setUpConstants()
5992 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5995 def setUpClass(cls):
5996 super(TestDSliteCE, cls).setUpClass()
5999 cls.create_pg_interfaces(range(2))
6001 cls.pg0.config_ip4()
6002 cls.pg0.resolve_arp()
6004 cls.pg1.config_ip6()
6005 cls.pg1.generate_remote_hosts(1)
6006 cls.pg1.configure_ipv6_neighbors()
6009 super(TestDSliteCE, cls).tearDownClass()
6012 def test_dslite_ce(self):
6013 """ Test DS-Lite CE """
6015 b4_ip4 = '192.0.0.2'
6016 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6017 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6018 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6019 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6021 aftr_ip4 = '192.0.0.1'
6022 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6023 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6024 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6025 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6027 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6028 dst_address_length=128,
6029 next_hop_address=self.pg1.remote_ip6n,
6030 next_hop_sw_if_index=self.pg1.sw_if_index,
6034 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6035 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6036 UDP(sport=10000, dport=20000))
6037 self.pg0.add_stream(p)
6038 self.pg_enable_capture(self.pg_interfaces)
6040 capture = self.pg1.get_capture(1)
6041 capture = capture[0]
6042 self.assertEqual(capture[IPv6].src, b4_ip6)
6043 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6044 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6045 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6046 self.assertEqual(capture[UDP].sport, 10000)
6047 self.assertEqual(capture[UDP].dport, 20000)
6048 self.check_ip_checksum(capture)
6051 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6052 IPv6(dst=b4_ip6, src=aftr_ip6) /
6053 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6054 UDP(sport=20000, dport=10000))
6055 self.pg1.add_stream(p)
6056 self.pg_enable_capture(self.pg_interfaces)
6058 capture = self.pg0.get_capture(1)
6059 capture = capture[0]
6060 self.assertFalse(capture.haslayer(IPv6))
6061 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6062 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6063 self.assertEqual(capture[UDP].sport, 20000)
6064 self.assertEqual(capture[UDP].dport, 10000)
6065 self.check_ip_checksum(capture)
6067 # ping DS-Lite B4 tunnel endpoint address
6068 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6069 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6070 ICMPv6EchoRequest())
6071 self.pg1.add_stream(p)
6072 self.pg_enable_capture(self.pg_interfaces)
6074 capture = self.pg1.get_capture(1)
6075 self.assertEqual(1, len(capture))
6076 capture = capture[0]
6077 self.assertEqual(capture[IPv6].src, b4_ip6)
6078 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6079 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6082 super(TestDSliteCE, self).tearDown()
6083 if not self.vpp_dead:
6085 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6087 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6089 if __name__ == '__main__':
6090 unittest.main(testRunner=VppTestRunner)