9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 interfaces = self.vapi.nat44_interface_addr_dump()
993 for intf in interfaces:
994 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
995 twice_nat=intf.twice_nat,
998 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
999 domain_id=self.ipfix_domain_id)
1000 self.ipfix_src_port = 4739
1001 self.ipfix_domain_id = 1
1003 interfaces = self.vapi.nat44_interface_dump()
1004 for intf in interfaces:
1005 if intf.is_inside > 1:
1006 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1013 interfaces = self.vapi.nat44_interface_output_feature_dump()
1014 for intf in interfaces:
1015 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1019 static_mappings = self.vapi.nat44_static_mapping_dump()
1020 for sm in static_mappings:
1021 self.vapi.nat44_add_del_static_mapping(
1022 sm.local_ip_address,
1023 sm.external_ip_address,
1024 local_port=sm.local_port,
1025 external_port=sm.external_port,
1026 addr_only=sm.addr_only,
1028 protocol=sm.protocol,
1029 twice_nat=sm.twice_nat,
1032 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1033 for lb_sm in lb_static_mappings:
1034 self.vapi.nat44_add_del_lb_static_mapping(
1035 lb_sm.external_addr,
1036 lb_sm.external_port,
1038 vrf_id=lb_sm.vrf_id,
1039 twice_nat=lb_sm.twice_nat,
1044 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1045 for id_m in identity_mappings:
1046 self.vapi.nat44_add_del_identity_mapping(
1047 addr_only=id_m.addr_only,
1050 sw_if_index=id_m.sw_if_index,
1052 protocol=id_m.protocol,
1055 adresses = self.vapi.nat44_address_dump()
1056 for addr in adresses:
1057 self.vapi.nat44_add_del_address_range(addr.ip_address,
1059 twice_nat=addr.twice_nat,
1062 self.vapi.nat_set_reass()
1063 self.vapi.nat_set_reass(is_ip6=1)
1065 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1066 local_port=0, external_port=0, vrf_id=0,
1067 is_add=1, external_sw_if_index=0xFFFFFFFF,
1068 proto=0, twice_nat=0):
1070 Add/delete NAT44 static mapping
1072 :param local_ip: Local IP address
1073 :param external_ip: External IP address
1074 :param local_port: Local port number (Optional)
1075 :param external_port: External port number (Optional)
1076 :param vrf_id: VRF ID (Default 0)
1077 :param is_add: 1 if add, 0 if delete (Default add)
1078 :param external_sw_if_index: External interface instead of IP address
1079 :param proto: IP protocol (Mandatory if port specified)
1080 :param twice_nat: 1 if translate external host address and port
1083 if local_port and external_port:
1085 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1086 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1087 self.vapi.nat44_add_del_static_mapping(
1090 external_sw_if_index,
1099 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1101 Add/delete NAT44 address
1103 :param ip: IP address
1104 :param is_add: 1 if add, 0 if delete (Default add)
1105 :param twice_nat: twice NAT address for extenal hosts
1107 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1108 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1110 twice_nat=twice_nat)
1112 def test_dynamic(self):
1113 """ NAT44 dynamic translation test """
1115 self.nat44_add_address(self.nat_addr)
1116 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1117 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1121 pkts = self.create_stream_in(self.pg0, self.pg1)
1122 self.pg0.add_stream(pkts)
1123 self.pg_enable_capture(self.pg_interfaces)
1125 capture = self.pg1.get_capture(len(pkts))
1126 self.verify_capture_out(capture)
1129 pkts = self.create_stream_out(self.pg1)
1130 self.pg1.add_stream(pkts)
1131 self.pg_enable_capture(self.pg_interfaces)
1133 capture = self.pg0.get_capture(len(pkts))
1134 self.verify_capture_in(capture, self.pg0)
1136 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1137 """ NAT44 handling of client packets with TTL=1 """
1139 self.nat44_add_address(self.nat_addr)
1140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1144 # Client side - generate traffic
1145 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1146 self.pg0.add_stream(pkts)
1147 self.pg_enable_capture(self.pg_interfaces)
1150 # Client side - verify ICMP type 11 packets
1151 capture = self.pg0.get_capture(len(pkts))
1152 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1154 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1155 """ NAT44 handling of server packets with TTL=1 """
1157 self.nat44_add_address(self.nat_addr)
1158 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1159 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1162 # Client side - create sessions
1163 pkts = self.create_stream_in(self.pg0, self.pg1)
1164 self.pg0.add_stream(pkts)
1165 self.pg_enable_capture(self.pg_interfaces)
1168 # Server side - generate traffic
1169 capture = self.pg1.get_capture(len(pkts))
1170 self.verify_capture_out(capture)
1171 pkts = self.create_stream_out(self.pg1, ttl=1)
1172 self.pg1.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Server side - verify ICMP type 11 packets
1177 capture = self.pg1.get_capture(len(pkts))
1178 self.verify_capture_out_with_icmp_errors(capture,
1179 src_ip=self.pg1.local_ip4)
1181 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1182 """ NAT44 handling of error responses to client packets with TTL=2 """
1184 self.nat44_add_address(self.nat_addr)
1185 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1186 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1189 # Client side - generate traffic
1190 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1191 self.pg0.add_stream(pkts)
1192 self.pg_enable_capture(self.pg_interfaces)
1195 # Server side - simulate ICMP type 11 response
1196 capture = self.pg1.get_capture(len(pkts))
1197 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1198 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1199 ICMP(type=11) / packet[IP] for packet in capture]
1200 self.pg1.add_stream(pkts)
1201 self.pg_enable_capture(self.pg_interfaces)
1204 # Client side - verify ICMP type 11 packets
1205 capture = self.pg0.get_capture(len(pkts))
1206 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1208 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1209 """ NAT44 handling of error responses to server packets with TTL=2 """
1211 self.nat44_add_address(self.nat_addr)
1212 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1213 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1216 # Client side - create sessions
1217 pkts = self.create_stream_in(self.pg0, self.pg1)
1218 self.pg0.add_stream(pkts)
1219 self.pg_enable_capture(self.pg_interfaces)
1222 # Server side - generate traffic
1223 capture = self.pg1.get_capture(len(pkts))
1224 self.verify_capture_out(capture)
1225 pkts = self.create_stream_out(self.pg1, ttl=2)
1226 self.pg1.add_stream(pkts)
1227 self.pg_enable_capture(self.pg_interfaces)
1230 # Client side - simulate ICMP type 11 response
1231 capture = self.pg0.get_capture(len(pkts))
1232 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1233 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1234 ICMP(type=11) / packet[IP] for packet in capture]
1235 self.pg0.add_stream(pkts)
1236 self.pg_enable_capture(self.pg_interfaces)
1239 # Server side - verify ICMP type 11 packets
1240 capture = self.pg1.get_capture(len(pkts))
1241 self.verify_capture_out_with_icmp_errors(capture)
1243 def test_ping_out_interface_from_outside(self):
1244 """ Ping NAT44 out interface from outside network """
1246 self.nat44_add_address(self.nat_addr)
1247 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1248 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1251 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1252 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1253 ICMP(id=self.icmp_id_out, type='echo-request'))
1255 self.pg1.add_stream(pkts)
1256 self.pg_enable_capture(self.pg_interfaces)
1258 capture = self.pg1.get_capture(len(pkts))
1259 self.assertEqual(1, len(capture))
1262 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1263 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1264 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1265 self.assertEqual(packet[ICMP].type, 0) # echo reply
1267 self.logger.error(ppp("Unexpected or invalid packet "
1268 "(outside network):", packet))
1271 def test_ping_internal_host_from_outside(self):
1272 """ Ping internal host from outside network """
1274 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1275 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1276 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1280 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1281 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1282 ICMP(id=self.icmp_id_out, type='echo-request'))
1283 self.pg1.add_stream(pkt)
1284 self.pg_enable_capture(self.pg_interfaces)
1286 capture = self.pg0.get_capture(1)
1287 self.verify_capture_in(capture, self.pg0, packet_num=1)
1288 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1291 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1292 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1293 ICMP(id=self.icmp_id_in, type='echo-reply'))
1294 self.pg0.add_stream(pkt)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg1.get_capture(1)
1298 self.verify_capture_out(capture, same_port=True, packet_num=1)
1299 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 def test_forwarding(self):
1302 """ NAT44 forwarding test """
1304 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1305 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1307 self.vapi.nat44_forwarding_enable_disable(1)
1309 real_ip = self.pg0.remote_ip4n
1310 alias_ip = self.nat_addr_n
1311 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1312 external_ip=alias_ip)
1315 # in2out - static mapping match
1317 pkts = self.create_stream_out(self.pg1)
1318 self.pg1.add_stream(pkts)
1319 self.pg_enable_capture(self.pg_interfaces)
1321 capture = self.pg0.get_capture(len(pkts))
1322 self.verify_capture_in(capture, self.pg0)
1324 pkts = self.create_stream_in(self.pg0, self.pg1)
1325 self.pg0.add_stream(pkts)
1326 self.pg_enable_capture(self.pg_interfaces)
1328 capture = self.pg1.get_capture(len(pkts))
1329 self.verify_capture_out(capture, same_port=True)
1331 # in2out - no static mapping match
1333 host0 = self.pg0.remote_hosts[0]
1334 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1336 pkts = self.create_stream_out(self.pg1,
1337 dst_ip=self.pg0.remote_ip4,
1338 use_inside_ports=True)
1339 self.pg1.add_stream(pkts)
1340 self.pg_enable_capture(self.pg_interfaces)
1342 capture = self.pg0.get_capture(len(pkts))
1343 self.verify_capture_in(capture, self.pg0)
1345 pkts = self.create_stream_in(self.pg0, self.pg1)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg1.get_capture(len(pkts))
1350 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1353 self.pg0.remote_hosts[0] = host0
1356 self.vapi.nat44_forwarding_enable_disable(0)
1357 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1358 external_ip=alias_ip,
1361 def test_static_in(self):
1362 """ 1:1 NAT initialized from inside network """
1364 nat_ip = "10.0.0.10"
1365 self.tcp_port_out = 6303
1366 self.udp_port_out = 6304
1367 self.icmp_id_out = 6305
1369 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1370 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1371 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1375 pkts = self.create_stream_in(self.pg0, self.pg1)
1376 self.pg0.add_stream(pkts)
1377 self.pg_enable_capture(self.pg_interfaces)
1379 capture = self.pg1.get_capture(len(pkts))
1380 self.verify_capture_out(capture, nat_ip, True)
1383 pkts = self.create_stream_out(self.pg1, nat_ip)
1384 self.pg1.add_stream(pkts)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 capture = self.pg0.get_capture(len(pkts))
1388 self.verify_capture_in(capture, self.pg0)
1390 def test_static_out(self):
1391 """ 1:1 NAT initialized from outside network """
1393 nat_ip = "10.0.0.20"
1394 self.tcp_port_out = 6303
1395 self.udp_port_out = 6304
1396 self.icmp_id_out = 6305
1398 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1399 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1400 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1404 pkts = self.create_stream_out(self.pg1, nat_ip)
1405 self.pg1.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg0.get_capture(len(pkts))
1409 self.verify_capture_in(capture, self.pg0)
1412 pkts = self.create_stream_in(self.pg0, self.pg1)
1413 self.pg0.add_stream(pkts)
1414 self.pg_enable_capture(self.pg_interfaces)
1416 capture = self.pg1.get_capture(len(pkts))
1417 self.verify_capture_out(capture, nat_ip, True)
1419 def test_static_with_port_in(self):
1420 """ 1:1 NAPT initialized from inside network """
1422 self.tcp_port_out = 3606
1423 self.udp_port_out = 3607
1424 self.icmp_id_out = 3608
1426 self.nat44_add_address(self.nat_addr)
1427 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1428 self.tcp_port_in, self.tcp_port_out,
1429 proto=IP_PROTOS.tcp)
1430 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431 self.udp_port_in, self.udp_port_out,
1432 proto=IP_PROTOS.udp)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.icmp_id_in, self.icmp_id_out,
1435 proto=IP_PROTOS.icmp)
1436 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1437 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1441 pkts = self.create_stream_in(self.pg0, self.pg1)
1442 self.pg0.add_stream(pkts)
1443 self.pg_enable_capture(self.pg_interfaces)
1445 capture = self.pg1.get_capture(len(pkts))
1446 self.verify_capture_out(capture)
1449 pkts = self.create_stream_out(self.pg1)
1450 self.pg1.add_stream(pkts)
1451 self.pg_enable_capture(self.pg_interfaces)
1453 capture = self.pg0.get_capture(len(pkts))
1454 self.verify_capture_in(capture, self.pg0)
1456 def test_static_with_port_out(self):
1457 """ 1:1 NAPT initialized from outside network """
1459 self.tcp_port_out = 30606
1460 self.udp_port_out = 30607
1461 self.icmp_id_out = 30608
1463 self.nat44_add_address(self.nat_addr)
1464 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1465 self.tcp_port_in, self.tcp_port_out,
1466 proto=IP_PROTOS.tcp)
1467 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1468 self.udp_port_in, self.udp_port_out,
1469 proto=IP_PROTOS.udp)
1470 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471 self.icmp_id_in, self.icmp_id_out,
1472 proto=IP_PROTOS.icmp)
1473 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1474 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1478 pkts = self.create_stream_out(self.pg1)
1479 self.pg1.add_stream(pkts)
1480 self.pg_enable_capture(self.pg_interfaces)
1482 capture = self.pg0.get_capture(len(pkts))
1483 self.verify_capture_in(capture, self.pg0)
1486 pkts = self.create_stream_in(self.pg0, self.pg1)
1487 self.pg0.add_stream(pkts)
1488 self.pg_enable_capture(self.pg_interfaces)
1490 capture = self.pg1.get_capture(len(pkts))
1491 self.verify_capture_out(capture)
1493 def test_static_vrf_aware(self):
1494 """ 1:1 NAT VRF awareness """
1496 nat_ip1 = "10.0.0.30"
1497 nat_ip2 = "10.0.0.40"
1498 self.tcp_port_out = 6303
1499 self.udp_port_out = 6304
1500 self.icmp_id_out = 6305
1502 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1504 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1506 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1508 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1509 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1511 # inside interface VRF match NAT44 static mapping VRF
1512 pkts = self.create_stream_in(self.pg4, self.pg3)
1513 self.pg4.add_stream(pkts)
1514 self.pg_enable_capture(self.pg_interfaces)
1516 capture = self.pg3.get_capture(len(pkts))
1517 self.verify_capture_out(capture, nat_ip1, True)
1519 # inside interface VRF don't match NAT44 static mapping VRF (packets
1521 pkts = self.create_stream_in(self.pg0, self.pg3)
1522 self.pg0.add_stream(pkts)
1523 self.pg_enable_capture(self.pg_interfaces)
1525 self.pg3.assert_nothing_captured()
1527 def test_identity_nat(self):
1528 """ Identity NAT """
1530 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1531 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1532 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1535 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1536 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1537 TCP(sport=12345, dport=56789))
1538 self.pg1.add_stream(p)
1539 self.pg_enable_capture(self.pg_interfaces)
1541 capture = self.pg0.get_capture(1)
1546 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1547 self.assertEqual(ip.src, self.pg1.remote_ip4)
1548 self.assertEqual(tcp.dport, 56789)
1549 self.assertEqual(tcp.sport, 12345)
1550 self.check_tcp_checksum(p)
1551 self.check_ip_checksum(p)
1553 self.logger.error(ppp("Unexpected or invalid packet:", p))
1556 def test_static_lb(self):
1557 """ NAT44 local service load balancing """
1558 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1561 server1 = self.pg0.remote_hosts[0]
1562 server2 = self.pg0.remote_hosts[1]
1564 locals = [{'addr': server1.ip4n,
1567 {'addr': server2.ip4n,
1571 self.nat44_add_address(self.nat_addr)
1572 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1575 local_num=len(locals),
1577 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1578 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1581 # from client to service
1582 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1583 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1584 TCP(sport=12345, dport=external_port))
1585 self.pg1.add_stream(p)
1586 self.pg_enable_capture(self.pg_interfaces)
1588 capture = self.pg0.get_capture(1)
1594 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1595 if ip.dst == server1.ip4:
1599 self.assertEqual(tcp.dport, local_port)
1600 self.check_tcp_checksum(p)
1601 self.check_ip_checksum(p)
1603 self.logger.error(ppp("Unexpected or invalid packet:", p))
1606 # from service back to client
1607 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1608 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1609 TCP(sport=local_port, dport=12345))
1610 self.pg0.add_stream(p)
1611 self.pg_enable_capture(self.pg_interfaces)
1613 capture = self.pg1.get_capture(1)
1618 self.assertEqual(ip.src, self.nat_addr)
1619 self.assertEqual(tcp.sport, external_port)
1620 self.check_tcp_checksum(p)
1621 self.check_ip_checksum(p)
1623 self.logger.error(ppp("Unexpected or invalid packet:", p))
1629 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1631 for client in clients:
1632 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1633 IP(src=client, dst=self.nat_addr) /
1634 TCP(sport=12345, dport=external_port))
1636 self.pg1.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg0.get_capture(len(pkts))
1641 if p[IP].dst == server1.ip4:
1645 self.assertTrue(server1_n > server2_n)
1647 def test_multiple_inside_interfaces(self):
1648 """ NAT44 multiple non-overlapping address space inside interfaces """
1650 self.nat44_add_address(self.nat_addr)
1651 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1652 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1653 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1656 # between two NAT44 inside interfaces (no translation)
1657 pkts = self.create_stream_in(self.pg0, self.pg1)
1658 self.pg0.add_stream(pkts)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg1.get_capture(len(pkts))
1662 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1664 # from NAT44 inside to interface without NAT44 feature (no translation)
1665 pkts = self.create_stream_in(self.pg0, self.pg2)
1666 self.pg0.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg2.get_capture(len(pkts))
1670 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1672 # in2out 1st interface
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 capture = self.pg3.get_capture(len(pkts))
1678 self.verify_capture_out(capture)
1680 # out2in 1st interface
1681 pkts = self.create_stream_out(self.pg3)
1682 self.pg3.add_stream(pkts)
1683 self.pg_enable_capture(self.pg_interfaces)
1685 capture = self.pg0.get_capture(len(pkts))
1686 self.verify_capture_in(capture, self.pg0)
1688 # in2out 2nd interface
1689 pkts = self.create_stream_in(self.pg1, self.pg3)
1690 self.pg1.add_stream(pkts)
1691 self.pg_enable_capture(self.pg_interfaces)
1693 capture = self.pg3.get_capture(len(pkts))
1694 self.verify_capture_out(capture)
1696 # out2in 2nd interface
1697 pkts = self.create_stream_out(self.pg3)
1698 self.pg3.add_stream(pkts)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 capture = self.pg1.get_capture(len(pkts))
1702 self.verify_capture_in(capture, self.pg1)
1704 def test_inside_overlapping_interfaces(self):
1705 """ NAT44 multiple inside interfaces with overlapping address space """
1707 static_nat_ip = "10.0.0.10"
1708 self.nat44_add_address(self.nat_addr)
1709 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1711 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1712 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1713 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1714 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1717 # between NAT44 inside interfaces with same VRF (no translation)
1718 pkts = self.create_stream_in(self.pg4, self.pg5)
1719 self.pg4.add_stream(pkts)
1720 self.pg_enable_capture(self.pg_interfaces)
1722 capture = self.pg5.get_capture(len(pkts))
1723 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1725 # between NAT44 inside interfaces with different VRF (hairpinning)
1726 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1727 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1728 TCP(sport=1234, dport=5678))
1729 self.pg4.add_stream(p)
1730 self.pg_enable_capture(self.pg_interfaces)
1732 capture = self.pg6.get_capture(1)
1737 self.assertEqual(ip.src, self.nat_addr)
1738 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1739 self.assertNotEqual(tcp.sport, 1234)
1740 self.assertEqual(tcp.dport, 5678)
1742 self.logger.error(ppp("Unexpected or invalid packet:", p))
1745 # in2out 1st interface
1746 pkts = self.create_stream_in(self.pg4, self.pg3)
1747 self.pg4.add_stream(pkts)
1748 self.pg_enable_capture(self.pg_interfaces)
1750 capture = self.pg3.get_capture(len(pkts))
1751 self.verify_capture_out(capture)
1753 # out2in 1st interface
1754 pkts = self.create_stream_out(self.pg3)
1755 self.pg3.add_stream(pkts)
1756 self.pg_enable_capture(self.pg_interfaces)
1758 capture = self.pg4.get_capture(len(pkts))
1759 self.verify_capture_in(capture, self.pg4)
1761 # in2out 2nd interface
1762 pkts = self.create_stream_in(self.pg5, self.pg3)
1763 self.pg5.add_stream(pkts)
1764 self.pg_enable_capture(self.pg_interfaces)
1766 capture = self.pg3.get_capture(len(pkts))
1767 self.verify_capture_out(capture)
1769 # out2in 2nd interface
1770 pkts = self.create_stream_out(self.pg3)
1771 self.pg3.add_stream(pkts)
1772 self.pg_enable_capture(self.pg_interfaces)
1774 capture = self.pg5.get_capture(len(pkts))
1775 self.verify_capture_in(capture, self.pg5)
1778 addresses = self.vapi.nat44_address_dump()
1779 self.assertEqual(len(addresses), 1)
1780 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1781 self.assertEqual(len(sessions), 3)
1782 for session in sessions:
1783 self.assertFalse(session.is_static)
1784 self.assertEqual(session.inside_ip_address[0:4],
1785 self.pg5.remote_ip4n)
1786 self.assertEqual(session.outside_ip_address,
1787 addresses[0].ip_address)
1788 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1789 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1790 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1791 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1792 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1793 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1794 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1795 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1796 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1798 # in2out 3rd interface
1799 pkts = self.create_stream_in(self.pg6, self.pg3)
1800 self.pg6.add_stream(pkts)
1801 self.pg_enable_capture(self.pg_interfaces)
1803 capture = self.pg3.get_capture(len(pkts))
1804 self.verify_capture_out(capture, static_nat_ip, True)
1806 # out2in 3rd interface
1807 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1808 self.pg3.add_stream(pkts)
1809 self.pg_enable_capture(self.pg_interfaces)
1811 capture = self.pg6.get_capture(len(pkts))
1812 self.verify_capture_in(capture, self.pg6)
1814 # general user and session dump verifications
1815 users = self.vapi.nat44_user_dump()
1816 self.assertTrue(len(users) >= 3)
1817 addresses = self.vapi.nat44_address_dump()
1818 self.assertEqual(len(addresses), 1)
1820 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1822 for session in sessions:
1823 self.assertEqual(user.ip_address, session.inside_ip_address)
1824 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1825 self.assertTrue(session.protocol in
1826 [IP_PROTOS.tcp, IP_PROTOS.udp,
1830 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1831 self.assertTrue(len(sessions) >= 4)
1832 for session in sessions:
1833 self.assertFalse(session.is_static)
1834 self.assertEqual(session.inside_ip_address[0:4],
1835 self.pg4.remote_ip4n)
1836 self.assertEqual(session.outside_ip_address,
1837 addresses[0].ip_address)
1840 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1841 self.assertTrue(len(sessions) >= 3)
1842 for session in sessions:
1843 self.assertTrue(session.is_static)
1844 self.assertEqual(session.inside_ip_address[0:4],
1845 self.pg6.remote_ip4n)
1846 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1847 map(int, static_nat_ip.split('.')))
1848 self.assertTrue(session.inside_port in
1849 [self.tcp_port_in, self.udp_port_in,
1852 def test_hairpinning(self):
1853 """ NAT44 hairpinning - 1:1 NAPT """
1855 host = self.pg0.remote_hosts[0]
1856 server = self.pg0.remote_hosts[1]
1859 server_in_port = 5678
1860 server_out_port = 8765
1862 self.nat44_add_address(self.nat_addr)
1863 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1864 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1866 # add static mapping for server
1867 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1868 server_in_port, server_out_port,
1869 proto=IP_PROTOS.tcp)
1871 # send packet from host to server
1872 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1873 IP(src=host.ip4, dst=self.nat_addr) /
1874 TCP(sport=host_in_port, dport=server_out_port))
1875 self.pg0.add_stream(p)
1876 self.pg_enable_capture(self.pg_interfaces)
1878 capture = self.pg0.get_capture(1)
1883 self.assertEqual(ip.src, self.nat_addr)
1884 self.assertEqual(ip.dst, server.ip4)
1885 self.assertNotEqual(tcp.sport, host_in_port)
1886 self.assertEqual(tcp.dport, server_in_port)
1887 self.check_tcp_checksum(p)
1888 host_out_port = tcp.sport
1890 self.logger.error(ppp("Unexpected or invalid packet:", p))
1893 # send reply from server to host
1894 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1895 IP(src=server.ip4, dst=self.nat_addr) /
1896 TCP(sport=server_in_port, dport=host_out_port))
1897 self.pg0.add_stream(p)
1898 self.pg_enable_capture(self.pg_interfaces)
1900 capture = self.pg0.get_capture(1)
1905 self.assertEqual(ip.src, self.nat_addr)
1906 self.assertEqual(ip.dst, host.ip4)
1907 self.assertEqual(tcp.sport, server_out_port)
1908 self.assertEqual(tcp.dport, host_in_port)
1909 self.check_tcp_checksum(p)
1911 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1914 def test_hairpinning2(self):
1915 """ NAT44 hairpinning - 1:1 NAT"""
1917 server1_nat_ip = "10.0.0.10"
1918 server2_nat_ip = "10.0.0.11"
1919 host = self.pg0.remote_hosts[0]
1920 server1 = self.pg0.remote_hosts[1]
1921 server2 = self.pg0.remote_hosts[2]
1922 server_tcp_port = 22
1923 server_udp_port = 20
1925 self.nat44_add_address(self.nat_addr)
1926 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1927 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1930 # add static mapping for servers
1931 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1932 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1936 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1937 IP(src=host.ip4, dst=server1_nat_ip) /
1938 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1940 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1941 IP(src=host.ip4, dst=server1_nat_ip) /
1942 UDP(sport=self.udp_port_in, dport=server_udp_port))
1944 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1945 IP(src=host.ip4, dst=server1_nat_ip) /
1946 ICMP(id=self.icmp_id_in, type='echo-request'))
1948 self.pg0.add_stream(pkts)
1949 self.pg_enable_capture(self.pg_interfaces)
1951 capture = self.pg0.get_capture(len(pkts))
1952 for packet in capture:
1954 self.assertEqual(packet[IP].src, self.nat_addr)
1955 self.assertEqual(packet[IP].dst, server1.ip4)
1956 if packet.haslayer(TCP):
1957 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1958 self.assertEqual(packet[TCP].dport, server_tcp_port)
1959 self.tcp_port_out = packet[TCP].sport
1960 self.check_tcp_checksum(packet)
1961 elif packet.haslayer(UDP):
1962 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1963 self.assertEqual(packet[UDP].dport, server_udp_port)
1964 self.udp_port_out = packet[UDP].sport
1966 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1967 self.icmp_id_out = packet[ICMP].id
1969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1974 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1975 IP(src=server1.ip4, dst=self.nat_addr) /
1976 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1978 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1979 IP(src=server1.ip4, dst=self.nat_addr) /
1980 UDP(sport=server_udp_port, dport=self.udp_port_out))
1982 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1983 IP(src=server1.ip4, dst=self.nat_addr) /
1984 ICMP(id=self.icmp_id_out, type='echo-reply'))
1986 self.pg0.add_stream(pkts)
1987 self.pg_enable_capture(self.pg_interfaces)
1989 capture = self.pg0.get_capture(len(pkts))
1990 for packet in capture:
1992 self.assertEqual(packet[IP].src, server1_nat_ip)
1993 self.assertEqual(packet[IP].dst, host.ip4)
1994 if packet.haslayer(TCP):
1995 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1996 self.assertEqual(packet[TCP].sport, server_tcp_port)
1997 self.check_tcp_checksum(packet)
1998 elif packet.haslayer(UDP):
1999 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2000 self.assertEqual(packet[UDP].sport, server_udp_port)
2002 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2004 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2007 # server2 to server1
2009 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2010 IP(src=server2.ip4, dst=server1_nat_ip) /
2011 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2013 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014 IP(src=server2.ip4, dst=server1_nat_ip) /
2015 UDP(sport=self.udp_port_in, dport=server_udp_port))
2017 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2018 IP(src=server2.ip4, dst=server1_nat_ip) /
2019 ICMP(id=self.icmp_id_in, type='echo-request'))
2021 self.pg0.add_stream(pkts)
2022 self.pg_enable_capture(self.pg_interfaces)
2024 capture = self.pg0.get_capture(len(pkts))
2025 for packet in capture:
2027 self.assertEqual(packet[IP].src, server2_nat_ip)
2028 self.assertEqual(packet[IP].dst, server1.ip4)
2029 if packet.haslayer(TCP):
2030 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2031 self.assertEqual(packet[TCP].dport, server_tcp_port)
2032 self.tcp_port_out = packet[TCP].sport
2033 self.check_tcp_checksum(packet)
2034 elif packet.haslayer(UDP):
2035 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2036 self.assertEqual(packet[UDP].dport, server_udp_port)
2037 self.udp_port_out = packet[UDP].sport
2039 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2040 self.icmp_id_out = packet[ICMP].id
2042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2045 # server1 to server2
2047 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2048 IP(src=server1.ip4, dst=server2_nat_ip) /
2049 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2052 IP(src=server1.ip4, dst=server2_nat_ip) /
2053 UDP(sport=server_udp_port, dport=self.udp_port_out))
2055 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2056 IP(src=server1.ip4, dst=server2_nat_ip) /
2057 ICMP(id=self.icmp_id_out, type='echo-reply'))
2059 self.pg0.add_stream(pkts)
2060 self.pg_enable_capture(self.pg_interfaces)
2062 capture = self.pg0.get_capture(len(pkts))
2063 for packet in capture:
2065 self.assertEqual(packet[IP].src, server1_nat_ip)
2066 self.assertEqual(packet[IP].dst, server2.ip4)
2067 if packet.haslayer(TCP):
2068 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2069 self.assertEqual(packet[TCP].sport, server_tcp_port)
2070 self.check_tcp_checksum(packet)
2071 elif packet.haslayer(UDP):
2072 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2073 self.assertEqual(packet[UDP].sport, server_udp_port)
2075 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2077 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2080 def test_max_translations_per_user(self):
2081 """ MAX translations per user - recycle the least recently used """
2083 self.nat44_add_address(self.nat_addr)
2084 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2085 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2088 # get maximum number of translations per user
2089 nat44_config = self.vapi.nat_show_config()
2091 # send more than maximum number of translations per user packets
2092 pkts_num = nat44_config.max_translations_per_user + 5
2094 for port in range(0, pkts_num):
2095 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2096 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2097 TCP(sport=1025 + port))
2099 self.pg0.add_stream(pkts)
2100 self.pg_enable_capture(self.pg_interfaces)
2103 # verify number of translated packet
2104 self.pg1.get_capture(pkts_num)
2106 def test_interface_addr(self):
2107 """ Acquire NAT44 addresses from interface """
2108 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2110 # no address in NAT pool
2111 adresses = self.vapi.nat44_address_dump()
2112 self.assertEqual(0, len(adresses))
2114 # configure interface address and check NAT address pool
2115 self.pg7.config_ip4()
2116 adresses = self.vapi.nat44_address_dump()
2117 self.assertEqual(1, len(adresses))
2118 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2120 # remove interface address and check NAT address pool
2121 self.pg7.unconfig_ip4()
2122 adresses = self.vapi.nat44_address_dump()
2123 self.assertEqual(0, len(adresses))
2125 def test_interface_addr_static_mapping(self):
2126 """ Static mapping with addresses from interface """
2127 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2128 self.nat44_add_static_mapping(
2130 external_sw_if_index=self.pg7.sw_if_index)
2132 # static mappings with external interface
2133 static_mappings = self.vapi.nat44_static_mapping_dump()
2134 self.assertEqual(1, len(static_mappings))
2135 self.assertEqual(self.pg7.sw_if_index,
2136 static_mappings[0].external_sw_if_index)
2138 # configure interface address and check static mappings
2139 self.pg7.config_ip4()
2140 static_mappings = self.vapi.nat44_static_mapping_dump()
2141 self.assertEqual(1, len(static_mappings))
2142 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2143 self.pg7.local_ip4n)
2144 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2146 # remove interface address and check static mappings
2147 self.pg7.unconfig_ip4()
2148 static_mappings = self.vapi.nat44_static_mapping_dump()
2149 self.assertEqual(0, len(static_mappings))
2151 def test_interface_addr_identity_nat(self):
2152 """ Identity NAT with addresses from interface """
2155 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2156 self.vapi.nat44_add_del_identity_mapping(
2157 sw_if_index=self.pg7.sw_if_index,
2159 protocol=IP_PROTOS.tcp,
2162 # identity mappings with external interface
2163 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2164 self.assertEqual(1, len(identity_mappings))
2165 self.assertEqual(self.pg7.sw_if_index,
2166 identity_mappings[0].sw_if_index)
2168 # configure interface address and check identity mappings
2169 self.pg7.config_ip4()
2170 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2171 self.assertEqual(1, len(identity_mappings))
2172 self.assertEqual(identity_mappings[0].ip_address,
2173 self.pg7.local_ip4n)
2174 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2175 self.assertEqual(port, identity_mappings[0].port)
2176 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2178 # remove interface address and check identity mappings
2179 self.pg7.unconfig_ip4()
2180 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2181 self.assertEqual(0, len(identity_mappings))
2183 def test_ipfix_nat44_sess(self):
2184 """ IPFIX logging NAT44 session created/delted """
2185 self.ipfix_domain_id = 10
2186 self.ipfix_src_port = 20202
2187 colector_port = 30303
2188 bind_layers(UDP, IPFIX, dport=30303)
2189 self.nat44_add_address(self.nat_addr)
2190 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2191 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2193 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2194 src_address=self.pg3.local_ip4n,
2196 template_interval=10,
2197 collector_port=colector_port)
2198 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2199 src_port=self.ipfix_src_port)
2201 pkts = self.create_stream_in(self.pg0, self.pg1)
2202 self.pg0.add_stream(pkts)
2203 self.pg_enable_capture(self.pg_interfaces)
2205 capture = self.pg1.get_capture(len(pkts))
2206 self.verify_capture_out(capture)
2207 self.nat44_add_address(self.nat_addr, is_add=0)
2208 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2209 capture = self.pg3.get_capture(9)
2210 ipfix = IPFIXDecoder()
2211 # first load template
2213 self.assertTrue(p.haslayer(IPFIX))
2214 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2215 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2216 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2217 self.assertEqual(p[UDP].dport, colector_port)
2218 self.assertEqual(p[IPFIX].observationDomainID,
2219 self.ipfix_domain_id)
2220 if p.haslayer(Template):
2221 ipfix.add_template(p.getlayer(Template))
2222 # verify events in data set
2224 if p.haslayer(Data):
2225 data = ipfix.decode_data_set(p.getlayer(Set))
2226 self.verify_ipfix_nat44_ses(data)
2228 def test_ipfix_addr_exhausted(self):
2229 """ IPFIX logging NAT addresses exhausted """
2230 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2231 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2233 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2234 src_address=self.pg3.local_ip4n,
2236 template_interval=10)
2237 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2238 src_port=self.ipfix_src_port)
2240 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2241 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2243 self.pg0.add_stream(p)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg1.get_capture(0)
2247 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2248 capture = self.pg3.get_capture(9)
2249 ipfix = IPFIXDecoder()
2250 # first load template
2252 self.assertTrue(p.haslayer(IPFIX))
2253 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2254 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2255 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2256 self.assertEqual(p[UDP].dport, 4739)
2257 self.assertEqual(p[IPFIX].observationDomainID,
2258 self.ipfix_domain_id)
2259 if p.haslayer(Template):
2260 ipfix.add_template(p.getlayer(Template))
2261 # verify events in data set
2263 if p.haslayer(Data):
2264 data = ipfix.decode_data_set(p.getlayer(Set))
2265 self.verify_ipfix_addr_exhausted(data)
2267 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2268 def test_ipfix_max_sessions(self):
2269 """ IPFIX logging maximum session entries exceeded """
2270 self.nat44_add_address(self.nat_addr)
2271 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2272 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2275 nat44_config = self.vapi.nat_show_config()
2276 max_sessions = 10 * nat44_config.translation_buckets
2279 for i in range(0, max_sessions):
2280 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=src, dst=self.pg1.remote_ip4) /
2285 self.pg0.add_stream(pkts)
2286 self.pg_enable_capture(self.pg_interfaces)
2289 self.pg1.get_capture(max_sessions)
2290 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2291 src_address=self.pg3.local_ip4n,
2293 template_interval=10)
2294 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2295 src_port=self.ipfix_src_port)
2297 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2300 self.pg0.add_stream(p)
2301 self.pg_enable_capture(self.pg_interfaces)
2303 self.pg1.get_capture(0)
2304 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2305 capture = self.pg3.get_capture(9)
2306 ipfix = IPFIXDecoder()
2307 # first load template
2309 self.assertTrue(p.haslayer(IPFIX))
2310 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2311 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2312 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2313 self.assertEqual(p[UDP].dport, 4739)
2314 self.assertEqual(p[IPFIX].observationDomainID,
2315 self.ipfix_domain_id)
2316 if p.haslayer(Template):
2317 ipfix.add_template(p.getlayer(Template))
2318 # verify events in data set
2320 if p.haslayer(Data):
2321 data = ipfix.decode_data_set(p.getlayer(Set))
2322 self.verify_ipfix_max_sessions(data, max_sessions)
2324 def test_pool_addr_fib(self):
2325 """ NAT44 add pool addresses to FIB """
2326 static_addr = '10.0.0.10'
2327 self.nat44_add_address(self.nat_addr)
2328 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2329 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2331 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2334 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2335 ARP(op=ARP.who_has, pdst=self.nat_addr,
2336 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2337 self.pg1.add_stream(p)
2338 self.pg_enable_capture(self.pg_interfaces)
2340 capture = self.pg1.get_capture(1)
2341 self.assertTrue(capture[0].haslayer(ARP))
2342 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2345 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2346 ARP(op=ARP.who_has, pdst=static_addr,
2347 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2348 self.pg1.add_stream(p)
2349 self.pg_enable_capture(self.pg_interfaces)
2351 capture = self.pg1.get_capture(1)
2352 self.assertTrue(capture[0].haslayer(ARP))
2353 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2355 # send ARP to non-NAT44 interface
2356 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2357 ARP(op=ARP.who_has, pdst=self.nat_addr,
2358 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2359 self.pg2.add_stream(p)
2360 self.pg_enable_capture(self.pg_interfaces)
2362 capture = self.pg1.get_capture(0)
2364 # remove addresses and verify
2365 self.nat44_add_address(self.nat_addr, is_add=0)
2366 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2369 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2370 ARP(op=ARP.who_has, pdst=self.nat_addr,
2371 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2372 self.pg1.add_stream(p)
2373 self.pg_enable_capture(self.pg_interfaces)
2375 capture = self.pg1.get_capture(0)
2377 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2378 ARP(op=ARP.who_has, pdst=static_addr,
2379 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2380 self.pg1.add_stream(p)
2381 self.pg_enable_capture(self.pg_interfaces)
2383 capture = self.pg1.get_capture(0)
2385 def test_vrf_mode(self):
2386 """ NAT44 tenant VRF aware address pool mode """
2390 nat_ip1 = "10.0.0.10"
2391 nat_ip2 = "10.0.0.11"
2393 self.pg0.unconfig_ip4()
2394 self.pg1.unconfig_ip4()
2395 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2396 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2397 self.pg0.set_table_ip4(vrf_id1)
2398 self.pg1.set_table_ip4(vrf_id2)
2399 self.pg0.config_ip4()
2400 self.pg1.config_ip4()
2402 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2403 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2406 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2410 pkts = self.create_stream_in(self.pg0, self.pg2)
2411 self.pg0.add_stream(pkts)
2412 self.pg_enable_capture(self.pg_interfaces)
2414 capture = self.pg2.get_capture(len(pkts))
2415 self.verify_capture_out(capture, nat_ip1)
2418 pkts = self.create_stream_in(self.pg1, self.pg2)
2419 self.pg1.add_stream(pkts)
2420 self.pg_enable_capture(self.pg_interfaces)
2422 capture = self.pg2.get_capture(len(pkts))
2423 self.verify_capture_out(capture, nat_ip2)
2425 self.pg0.unconfig_ip4()
2426 self.pg1.unconfig_ip4()
2427 self.pg0.set_table_ip4(0)
2428 self.pg1.set_table_ip4(0)
2429 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2430 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2432 def test_vrf_feature_independent(self):
2433 """ NAT44 tenant VRF independent address pool mode """
2435 nat_ip1 = "10.0.0.10"
2436 nat_ip2 = "10.0.0.11"
2438 self.nat44_add_address(nat_ip1)
2439 self.nat44_add_address(nat_ip2, vrf_id=99)
2440 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2441 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2442 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2446 pkts = self.create_stream_in(self.pg0, self.pg2)
2447 self.pg0.add_stream(pkts)
2448 self.pg_enable_capture(self.pg_interfaces)
2450 capture = self.pg2.get_capture(len(pkts))
2451 self.verify_capture_out(capture, nat_ip1)
2454 pkts = self.create_stream_in(self.pg1, self.pg2)
2455 self.pg1.add_stream(pkts)
2456 self.pg_enable_capture(self.pg_interfaces)
2458 capture = self.pg2.get_capture(len(pkts))
2459 self.verify_capture_out(capture, nat_ip1)
2461 def test_dynamic_ipless_interfaces(self):
2462 """ NAT44 interfaces without configured IP address """
2464 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2465 mactobinary(self.pg7.remote_mac),
2466 self.pg7.remote_ip4n,
2468 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2469 mactobinary(self.pg8.remote_mac),
2470 self.pg8.remote_ip4n,
2473 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2474 dst_address_length=32,
2475 next_hop_address=self.pg7.remote_ip4n,
2476 next_hop_sw_if_index=self.pg7.sw_if_index)
2477 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2478 dst_address_length=32,
2479 next_hop_address=self.pg8.remote_ip4n,
2480 next_hop_sw_if_index=self.pg8.sw_if_index)
2482 self.nat44_add_address(self.nat_addr)
2483 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2484 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2488 pkts = self.create_stream_in(self.pg7, self.pg8)
2489 self.pg7.add_stream(pkts)
2490 self.pg_enable_capture(self.pg_interfaces)
2492 capture = self.pg8.get_capture(len(pkts))
2493 self.verify_capture_out(capture)
2496 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2497 self.pg8.add_stream(pkts)
2498 self.pg_enable_capture(self.pg_interfaces)
2500 capture = self.pg7.get_capture(len(pkts))
2501 self.verify_capture_in(capture, self.pg7)
2503 def test_static_ipless_interfaces(self):
2504 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2506 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2507 mactobinary(self.pg7.remote_mac),
2508 self.pg7.remote_ip4n,
2510 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2511 mactobinary(self.pg8.remote_mac),
2512 self.pg8.remote_ip4n,
2515 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2516 dst_address_length=32,
2517 next_hop_address=self.pg7.remote_ip4n,
2518 next_hop_sw_if_index=self.pg7.sw_if_index)
2519 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2520 dst_address_length=32,
2521 next_hop_address=self.pg8.remote_ip4n,
2522 next_hop_sw_if_index=self.pg8.sw_if_index)
2524 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2525 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2526 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2530 pkts = self.create_stream_out(self.pg8)
2531 self.pg8.add_stream(pkts)
2532 self.pg_enable_capture(self.pg_interfaces)
2534 capture = self.pg7.get_capture(len(pkts))
2535 self.verify_capture_in(capture, self.pg7)
2538 pkts = self.create_stream_in(self.pg7, self.pg8)
2539 self.pg7.add_stream(pkts)
2540 self.pg_enable_capture(self.pg_interfaces)
2542 capture = self.pg8.get_capture(len(pkts))
2543 self.verify_capture_out(capture, self.nat_addr, True)
2545 def test_static_with_port_ipless_interfaces(self):
2546 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2548 self.tcp_port_out = 30606
2549 self.udp_port_out = 30607
2550 self.icmp_id_out = 30608
2552 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2553 mactobinary(self.pg7.remote_mac),
2554 self.pg7.remote_ip4n,
2556 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2557 mactobinary(self.pg8.remote_mac),
2558 self.pg8.remote_ip4n,
2561 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2562 dst_address_length=32,
2563 next_hop_address=self.pg7.remote_ip4n,
2564 next_hop_sw_if_index=self.pg7.sw_if_index)
2565 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2566 dst_address_length=32,
2567 next_hop_address=self.pg8.remote_ip4n,
2568 next_hop_sw_if_index=self.pg8.sw_if_index)
2570 self.nat44_add_address(self.nat_addr)
2571 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2572 self.tcp_port_in, self.tcp_port_out,
2573 proto=IP_PROTOS.tcp)
2574 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2575 self.udp_port_in, self.udp_port_out,
2576 proto=IP_PROTOS.udp)
2577 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2578 self.icmp_id_in, self.icmp_id_out,
2579 proto=IP_PROTOS.icmp)
2580 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2581 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2585 pkts = self.create_stream_out(self.pg8)
2586 self.pg8.add_stream(pkts)
2587 self.pg_enable_capture(self.pg_interfaces)
2589 capture = self.pg7.get_capture(len(pkts))
2590 self.verify_capture_in(capture, self.pg7)
2593 pkts = self.create_stream_in(self.pg7, self.pg8)
2594 self.pg7.add_stream(pkts)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 capture = self.pg8.get_capture(len(pkts))
2598 self.verify_capture_out(capture)
2600 def test_static_unknown_proto(self):
2601 """ 1:1 NAT translate packet with unknown protocol """
2602 nat_ip = "10.0.0.10"
2603 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2604 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2605 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2609 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2610 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2612 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2613 TCP(sport=1234, dport=1234))
2614 self.pg0.add_stream(p)
2615 self.pg_enable_capture(self.pg_interfaces)
2617 p = self.pg1.get_capture(1)
2620 self.assertEqual(packet[IP].src, nat_ip)
2621 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2622 self.assertTrue(packet.haslayer(GRE))
2623 self.check_ip_checksum(packet)
2625 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2629 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2630 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2632 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2633 TCP(sport=1234, dport=1234))
2634 self.pg1.add_stream(p)
2635 self.pg_enable_capture(self.pg_interfaces)
2637 p = self.pg0.get_capture(1)
2640 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2641 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2642 self.assertTrue(packet.haslayer(GRE))
2643 self.check_ip_checksum(packet)
2645 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2648 def test_hairpinning_static_unknown_proto(self):
2649 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2651 host = self.pg0.remote_hosts[0]
2652 server = self.pg0.remote_hosts[1]
2654 host_nat_ip = "10.0.0.10"
2655 server_nat_ip = "10.0.0.11"
2657 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2658 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2659 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2660 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2664 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2665 IP(src=host.ip4, dst=server_nat_ip) /
2667 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2668 TCP(sport=1234, dport=1234))
2669 self.pg0.add_stream(p)
2670 self.pg_enable_capture(self.pg_interfaces)
2672 p = self.pg0.get_capture(1)
2675 self.assertEqual(packet[IP].src, host_nat_ip)
2676 self.assertEqual(packet[IP].dst, server.ip4)
2677 self.assertTrue(packet.haslayer(GRE))
2678 self.check_ip_checksum(packet)
2680 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2684 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2685 IP(src=server.ip4, dst=host_nat_ip) /
2687 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2688 TCP(sport=1234, dport=1234))
2689 self.pg0.add_stream(p)
2690 self.pg_enable_capture(self.pg_interfaces)
2692 p = self.pg0.get_capture(1)
2695 self.assertEqual(packet[IP].src, server_nat_ip)
2696 self.assertEqual(packet[IP].dst, host.ip4)
2697 self.assertTrue(packet.haslayer(GRE))
2698 self.check_ip_checksum(packet)
2700 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2703 def test_unknown_proto(self):
2704 """ NAT44 translate packet with unknown protocol """
2705 self.nat44_add_address(self.nat_addr)
2706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2711 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2712 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2713 TCP(sport=self.tcp_port_in, dport=20))
2714 self.pg0.add_stream(p)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 p = self.pg1.get_capture(1)
2719 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2720 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2722 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2723 TCP(sport=1234, dport=1234))
2724 self.pg0.add_stream(p)
2725 self.pg_enable_capture(self.pg_interfaces)
2727 p = self.pg1.get_capture(1)
2730 self.assertEqual(packet[IP].src, self.nat_addr)
2731 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2732 self.assertTrue(packet.haslayer(GRE))
2733 self.check_ip_checksum(packet)
2735 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2739 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2740 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2742 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2743 TCP(sport=1234, dport=1234))
2744 self.pg1.add_stream(p)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 p = self.pg0.get_capture(1)
2750 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2751 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2752 self.assertTrue(packet.haslayer(GRE))
2753 self.check_ip_checksum(packet)
2755 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2758 def test_hairpinning_unknown_proto(self):
2759 """ NAT44 translate packet with unknown protocol - hairpinning """
2760 host = self.pg0.remote_hosts[0]
2761 server = self.pg0.remote_hosts[1]
2764 server_in_port = 5678
2765 server_out_port = 8765
2766 server_nat_ip = "10.0.0.11"
2768 self.nat44_add_address(self.nat_addr)
2769 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2770 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2773 # add static mapping for server
2774 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2777 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2778 IP(src=host.ip4, dst=server_nat_ip) /
2779 TCP(sport=host_in_port, dport=server_out_port))
2780 self.pg0.add_stream(p)
2781 self.pg_enable_capture(self.pg_interfaces)
2783 capture = self.pg0.get_capture(1)
2785 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2786 IP(src=host.ip4, dst=server_nat_ip) /
2788 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2789 TCP(sport=1234, dport=1234))
2790 self.pg0.add_stream(p)
2791 self.pg_enable_capture(self.pg_interfaces)
2793 p = self.pg0.get_capture(1)
2796 self.assertEqual(packet[IP].src, self.nat_addr)
2797 self.assertEqual(packet[IP].dst, server.ip4)
2798 self.assertTrue(packet.haslayer(GRE))
2799 self.check_ip_checksum(packet)
2801 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2805 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2806 IP(src=server.ip4, dst=self.nat_addr) /
2808 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2809 TCP(sport=1234, dport=1234))
2810 self.pg0.add_stream(p)
2811 self.pg_enable_capture(self.pg_interfaces)
2813 p = self.pg0.get_capture(1)
2816 self.assertEqual(packet[IP].src, server_nat_ip)
2817 self.assertEqual(packet[IP].dst, host.ip4)
2818 self.assertTrue(packet.haslayer(GRE))
2819 self.check_ip_checksum(packet)
2821 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2824 def test_output_feature(self):
2825 """ NAT44 interface output feature (in2out postrouting) """
2826 self.nat44_add_address(self.nat_addr)
2827 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2828 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2829 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2833 pkts = self.create_stream_in(self.pg0, self.pg3)
2834 self.pg0.add_stream(pkts)
2835 self.pg_enable_capture(self.pg_interfaces)
2837 capture = self.pg3.get_capture(len(pkts))
2838 self.verify_capture_out(capture)
2841 pkts = self.create_stream_out(self.pg3)
2842 self.pg3.add_stream(pkts)
2843 self.pg_enable_capture(self.pg_interfaces)
2845 capture = self.pg0.get_capture(len(pkts))
2846 self.verify_capture_in(capture, self.pg0)
2848 # from non-NAT interface to NAT inside interface
2849 pkts = self.create_stream_in(self.pg2, self.pg0)
2850 self.pg2.add_stream(pkts)
2851 self.pg_enable_capture(self.pg_interfaces)
2853 capture = self.pg0.get_capture(len(pkts))
2854 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2856 def test_output_feature_vrf_aware(self):
2857 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2858 nat_ip_vrf10 = "10.0.0.10"
2859 nat_ip_vrf20 = "10.0.0.20"
2861 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2862 dst_address_length=32,
2863 next_hop_address=self.pg3.remote_ip4n,
2864 next_hop_sw_if_index=self.pg3.sw_if_index,
2866 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2867 dst_address_length=32,
2868 next_hop_address=self.pg3.remote_ip4n,
2869 next_hop_sw_if_index=self.pg3.sw_if_index,
2872 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2873 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2874 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2875 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2876 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2880 pkts = self.create_stream_in(self.pg4, self.pg3)
2881 self.pg4.add_stream(pkts)
2882 self.pg_enable_capture(self.pg_interfaces)
2884 capture = self.pg3.get_capture(len(pkts))
2885 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2888 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2889 self.pg3.add_stream(pkts)
2890 self.pg_enable_capture(self.pg_interfaces)
2892 capture = self.pg4.get_capture(len(pkts))
2893 self.verify_capture_in(capture, self.pg4)
2896 pkts = self.create_stream_in(self.pg6, self.pg3)
2897 self.pg6.add_stream(pkts)
2898 self.pg_enable_capture(self.pg_interfaces)
2900 capture = self.pg3.get_capture(len(pkts))
2901 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2904 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2905 self.pg3.add_stream(pkts)
2906 self.pg_enable_capture(self.pg_interfaces)
2908 capture = self.pg6.get_capture(len(pkts))
2909 self.verify_capture_in(capture, self.pg6)
2911 def test_output_feature_hairpinning(self):
2912 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2913 host = self.pg0.remote_hosts[0]
2914 server = self.pg0.remote_hosts[1]
2917 server_in_port = 5678
2918 server_out_port = 8765
2920 self.nat44_add_address(self.nat_addr)
2921 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2922 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2925 # add static mapping for server
2926 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2927 server_in_port, server_out_port,
2928 proto=IP_PROTOS.tcp)
2930 # send packet from host to server
2931 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2932 IP(src=host.ip4, dst=self.nat_addr) /
2933 TCP(sport=host_in_port, dport=server_out_port))
2934 self.pg0.add_stream(p)
2935 self.pg_enable_capture(self.pg_interfaces)
2937 capture = self.pg0.get_capture(1)
2942 self.assertEqual(ip.src, self.nat_addr)
2943 self.assertEqual(ip.dst, server.ip4)
2944 self.assertNotEqual(tcp.sport, host_in_port)
2945 self.assertEqual(tcp.dport, server_in_port)
2946 self.check_tcp_checksum(p)
2947 host_out_port = tcp.sport
2949 self.logger.error(ppp("Unexpected or invalid packet:", p))
2952 # send reply from server to host
2953 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2954 IP(src=server.ip4, dst=self.nat_addr) /
2955 TCP(sport=server_in_port, dport=host_out_port))
2956 self.pg0.add_stream(p)
2957 self.pg_enable_capture(self.pg_interfaces)
2959 capture = self.pg0.get_capture(1)
2964 self.assertEqual(ip.src, self.nat_addr)
2965 self.assertEqual(ip.dst, host.ip4)
2966 self.assertEqual(tcp.sport, server_out_port)
2967 self.assertEqual(tcp.dport, host_in_port)
2968 self.check_tcp_checksum(p)
2970 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2973 def test_one_armed_nat44(self):
2974 """ One armed NAT44 """
2975 remote_host = self.pg9.remote_hosts[0]
2976 local_host = self.pg9.remote_hosts[1]
2979 self.nat44_add_address(self.nat_addr)
2980 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2981 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2985 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2986 IP(src=local_host.ip4, dst=remote_host.ip4) /
2987 TCP(sport=12345, dport=80))
2988 self.pg9.add_stream(p)
2989 self.pg_enable_capture(self.pg_interfaces)
2991 capture = self.pg9.get_capture(1)
2996 self.assertEqual(ip.src, self.nat_addr)
2997 self.assertEqual(ip.dst, remote_host.ip4)
2998 self.assertNotEqual(tcp.sport, 12345)
2999 external_port = tcp.sport
3000 self.assertEqual(tcp.dport, 80)
3001 self.check_tcp_checksum(p)
3002 self.check_ip_checksum(p)
3004 self.logger.error(ppp("Unexpected or invalid packet:", p))
3008 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3009 IP(src=remote_host.ip4, dst=self.nat_addr) /
3010 TCP(sport=80, dport=external_port))
3011 self.pg9.add_stream(p)
3012 self.pg_enable_capture(self.pg_interfaces)
3014 capture = self.pg9.get_capture(1)
3019 self.assertEqual(ip.src, remote_host.ip4)
3020 self.assertEqual(ip.dst, local_host.ip4)
3021 self.assertEqual(tcp.sport, 80)
3022 self.assertEqual(tcp.dport, 12345)
3023 self.check_tcp_checksum(p)
3024 self.check_ip_checksum(p)
3026 self.logger.error(ppp("Unexpected or invalid packet:", p))
3029 def test_del_session(self):
3030 """ Delete NAT44 session """
3031 self.nat44_add_address(self.nat_addr)
3032 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3033 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3036 pkts = self.create_stream_in(self.pg0, self.pg1)
3037 self.pg0.add_stream(pkts)
3038 self.pg_enable_capture(self.pg_interfaces)
3040 capture = self.pg1.get_capture(len(pkts))
3042 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3043 nsessions = len(sessions)
3045 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3046 sessions[0].inside_port,
3047 sessions[0].protocol)
3048 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3049 sessions[1].outside_port,
3050 sessions[1].protocol,
3053 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3054 self.assertEqual(nsessions - len(sessions), 2)
3056 def test_set_get_reass(self):
3057 """ NAT44 set/get virtual fragmentation reassembly """
3058 reas_cfg1 = self.vapi.nat_get_reass()
3060 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3061 max_reass=reas_cfg1.ip4_max_reass * 2,
3062 max_frag=reas_cfg1.ip4_max_frag * 2)
3064 reas_cfg2 = self.vapi.nat_get_reass()
3066 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3067 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3068 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3070 self.vapi.nat_set_reass(drop_frag=1)
3071 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3073 def test_frag_in_order(self):
3074 """ NAT44 translate fragments arriving in order """
3075 self.nat44_add_address(self.nat_addr)
3076 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3077 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3080 data = "A" * 4 + "B" * 16 + "C" * 3
3081 self.tcp_port_in = random.randint(1025, 65535)
3083 reass = self.vapi.nat_reass_dump()
3084 reass_n_start = len(reass)
3087 pkts = self.create_stream_frag(self.pg0,
3088 self.pg1.remote_ip4,
3092 self.pg0.add_stream(pkts)
3093 self.pg_enable_capture(self.pg_interfaces)
3095 frags = self.pg1.get_capture(len(pkts))
3096 p = self.reass_frags_and_verify(frags,
3098 self.pg1.remote_ip4)
3099 self.assertEqual(p[TCP].dport, 20)
3100 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3101 self.tcp_port_out = p[TCP].sport
3102 self.assertEqual(data, p[Raw].load)
3105 pkts = self.create_stream_frag(self.pg1,
3110 self.pg1.add_stream(pkts)
3111 self.pg_enable_capture(self.pg_interfaces)
3113 frags = self.pg0.get_capture(len(pkts))
3114 p = self.reass_frags_and_verify(frags,
3115 self.pg1.remote_ip4,
3116 self.pg0.remote_ip4)
3117 self.assertEqual(p[TCP].sport, 20)
3118 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3119 self.assertEqual(data, p[Raw].load)
3121 reass = self.vapi.nat_reass_dump()
3122 reass_n_end = len(reass)
3124 self.assertEqual(reass_n_end - reass_n_start, 2)
3126 def test_reass_hairpinning(self):
3127 """ NAT44 fragments hairpinning """
3128 host = self.pg0.remote_hosts[0]
3129 server = self.pg0.remote_hosts[1]
3130 host_in_port = random.randint(1025, 65535)
3132 server_in_port = random.randint(1025, 65535)
3133 server_out_port = random.randint(1025, 65535)
3134 data = "A" * 4 + "B" * 16 + "C" * 3
3136 self.nat44_add_address(self.nat_addr)
3137 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3138 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3140 # add static mapping for server
3141 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3142 server_in_port, server_out_port,
3143 proto=IP_PROTOS.tcp)
3145 # send packet from host to server
3146 pkts = self.create_stream_frag(self.pg0,
3151 self.pg0.add_stream(pkts)
3152 self.pg_enable_capture(self.pg_interfaces)
3154 frags = self.pg0.get_capture(len(pkts))
3155 p = self.reass_frags_and_verify(frags,
3158 self.assertNotEqual(p[TCP].sport, host_in_port)
3159 self.assertEqual(p[TCP].dport, server_in_port)
3160 self.assertEqual(data, p[Raw].load)
3162 def test_frag_out_of_order(self):
3163 """ NAT44 translate fragments arriving out of order """
3164 self.nat44_add_address(self.nat_addr)
3165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3169 data = "A" * 4 + "B" * 16 + "C" * 3
3170 random.randint(1025, 65535)
3173 pkts = self.create_stream_frag(self.pg0,
3174 self.pg1.remote_ip4,
3179 self.pg0.add_stream(pkts)
3180 self.pg_enable_capture(self.pg_interfaces)
3182 frags = self.pg1.get_capture(len(pkts))
3183 p = self.reass_frags_and_verify(frags,
3185 self.pg1.remote_ip4)
3186 self.assertEqual(p[TCP].dport, 20)
3187 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3188 self.tcp_port_out = p[TCP].sport
3189 self.assertEqual(data, p[Raw].load)
3192 pkts = self.create_stream_frag(self.pg1,
3198 self.pg1.add_stream(pkts)
3199 self.pg_enable_capture(self.pg_interfaces)
3201 frags = self.pg0.get_capture(len(pkts))
3202 p = self.reass_frags_and_verify(frags,
3203 self.pg1.remote_ip4,
3204 self.pg0.remote_ip4)
3205 self.assertEqual(p[TCP].sport, 20)
3206 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3207 self.assertEqual(data, p[Raw].load)
3209 def test_port_restricted(self):
3210 """ Port restricted NAT44 (MAP-E CE) """
3211 self.nat44_add_address(self.nat_addr)
3212 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3213 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3215 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3216 "psid-offset 6 psid-len 6")
3218 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3219 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3220 TCP(sport=4567, dport=22))
3221 self.pg0.add_stream(p)
3222 self.pg_enable_capture(self.pg_interfaces)
3224 capture = self.pg1.get_capture(1)
3229 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3230 self.assertEqual(ip.src, self.nat_addr)
3231 self.assertEqual(tcp.dport, 22)
3232 self.assertNotEqual(tcp.sport, 4567)
3233 self.assertEqual((tcp.sport >> 6) & 63, 10)
3234 self.check_tcp_checksum(p)
3235 self.check_ip_checksum(p)
3237 self.logger.error(ppp("Unexpected or invalid packet:", p))
3240 def test_twice_nat(self):
3242 twice_nat_addr = '10.0.1.3'
3247 self.nat44_add_address(self.nat_addr)
3248 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3249 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3250 port_in, port_out, proto=IP_PROTOS.tcp,
3252 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3253 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3256 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3257 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3258 TCP(sport=eh_port_out, dport=port_out))
3259 self.pg1.add_stream(p)
3260 self.pg_enable_capture(self.pg_interfaces)
3262 capture = self.pg0.get_capture(1)
3267 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3268 self.assertEqual(ip.src, twice_nat_addr)
3269 self.assertEqual(tcp.dport, port_in)
3270 self.assertNotEqual(tcp.sport, eh_port_out)
3271 eh_port_in = tcp.sport
3272 self.check_tcp_checksum(p)
3273 self.check_ip_checksum(p)
3275 self.logger.error(ppp("Unexpected or invalid packet:", p))
3278 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3279 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3280 TCP(sport=port_in, dport=eh_port_in))
3281 self.pg0.add_stream(p)
3282 self.pg_enable_capture(self.pg_interfaces)
3284 capture = self.pg1.get_capture(1)
3289 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3290 self.assertEqual(ip.src, self.nat_addr)
3291 self.assertEqual(tcp.dport, eh_port_out)
3292 self.assertEqual(tcp.sport, port_out)
3293 self.check_tcp_checksum(p)
3294 self.check_ip_checksum(p)
3296 self.logger.error(ppp("Unexpected or invalid packet:", p))
3299 def test_twice_nat_lb(self):
3300 """ Twice NAT44 local service load balancing """
3301 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3302 twice_nat_addr = '10.0.1.3'
3307 server1 = self.pg0.remote_hosts[0]
3308 server2 = self.pg0.remote_hosts[1]
3310 locals = [{'addr': server1.ip4n,
3313 {'addr': server2.ip4n,
3317 self.nat44_add_address(self.nat_addr)
3318 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3320 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3324 local_num=len(locals),
3326 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3327 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3330 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3331 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3332 TCP(sport=eh_port_out, dport=external_port))
3333 self.pg1.add_stream(p)
3334 self.pg_enable_capture(self.pg_interfaces)
3336 capture = self.pg0.get_capture(1)
3342 self.assertEqual(ip.src, twice_nat_addr)
3343 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3344 if ip.dst == server1.ip4:
3348 self.assertNotEqual(tcp.sport, eh_port_out)
3349 eh_port_in = tcp.sport
3350 self.assertEqual(tcp.dport, local_port)
3351 self.check_tcp_checksum(p)
3352 self.check_ip_checksum(p)
3354 self.logger.error(ppp("Unexpected or invalid packet:", p))
3357 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3358 IP(src=server.ip4, dst=twice_nat_addr) /
3359 TCP(sport=local_port, dport=eh_port_in))
3360 self.pg0.add_stream(p)
3361 self.pg_enable_capture(self.pg_interfaces)
3363 capture = self.pg1.get_capture(1)
3368 self.assertEqual(ip.src, self.nat_addr)
3369 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3370 self.assertEqual(tcp.sport, external_port)
3371 self.assertEqual(tcp.dport, eh_port_out)
3372 self.check_tcp_checksum(p)
3373 self.check_ip_checksum(p)
3375 self.logger.error(ppp("Unexpected or invalid packet:", p))
3378 def test_twice_nat_interface_addr(self):
3379 """ Acquire twice NAT44 addresses from interface """
3380 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3382 # no address in NAT pool
3383 adresses = self.vapi.nat44_address_dump()
3384 self.assertEqual(0, len(adresses))
3386 # configure interface address and check NAT address pool
3387 self.pg7.config_ip4()
3388 adresses = self.vapi.nat44_address_dump()
3389 self.assertEqual(1, len(adresses))
3390 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3391 self.assertEqual(adresses[0].twice_nat, 1)
3393 # remove interface address and check NAT address pool
3394 self.pg7.unconfig_ip4()
3395 adresses = self.vapi.nat44_address_dump()
3396 self.assertEqual(0, len(adresses))
3398 def test_ipfix_max_frags(self):
3399 """ IPFIX logging maximum fragments pending reassembly exceeded """
3400 self.nat44_add_address(self.nat_addr)
3401 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3402 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3404 self.vapi.nat_set_reass(max_frag=0)
3405 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3406 src_address=self.pg3.local_ip4n,
3408 template_interval=10)
3409 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3410 src_port=self.ipfix_src_port)
3412 data = "A" * 4 + "B" * 16 + "C" * 3
3413 self.tcp_port_in = random.randint(1025, 65535)
3414 pkts = self.create_stream_frag(self.pg0,
3415 self.pg1.remote_ip4,
3419 self.pg0.add_stream(pkts[-1])
3420 self.pg_enable_capture(self.pg_interfaces)
3422 frags = self.pg1.get_capture(0)
3423 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3424 capture = self.pg3.get_capture(9)
3425 ipfix = IPFIXDecoder()
3426 # first load template
3428 self.assertTrue(p.haslayer(IPFIX))
3429 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3430 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3431 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3432 self.assertEqual(p[UDP].dport, 4739)
3433 self.assertEqual(p[IPFIX].observationDomainID,
3434 self.ipfix_domain_id)
3435 if p.haslayer(Template):
3436 ipfix.add_template(p.getlayer(Template))
3437 # verify events in data set
3439 if p.haslayer(Data):
3440 data = ipfix.decode_data_set(p.getlayer(Set))
3441 self.verify_ipfix_max_fragments_ip4(data, 0,
3442 self.pg0.remote_ip4n)
3445 super(TestNAT44, self).tearDown()
3446 if not self.vpp_dead:
3447 self.logger.info(self.vapi.cli("show nat44 verbose"))
3448 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3449 self.vapi.cli("nat addr-port-assignment-alg default")
3453 class TestNAT44Out2InDPO(MethodHolder):
3454 """ NAT44 Test Cases using out2in DPO """
3457 def setUpConstants(cls):
3458 super(TestNAT44Out2InDPO, cls).setUpConstants()
3459 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3462 def setUpClass(cls):
3463 super(TestNAT44Out2InDPO, cls).setUpClass()
3466 cls.tcp_port_in = 6303
3467 cls.tcp_port_out = 6303
3468 cls.udp_port_in = 6304
3469 cls.udp_port_out = 6304
3470 cls.icmp_id_in = 6305
3471 cls.icmp_id_out = 6305
3472 cls.nat_addr = '10.0.0.3'
3473 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3474 cls.dst_ip4 = '192.168.70.1'
3476 cls.create_pg_interfaces(range(2))
3479 cls.pg0.config_ip4()
3480 cls.pg0.resolve_arp()
3483 cls.pg1.config_ip6()
3484 cls.pg1.resolve_ndp()
3486 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3487 dst_address_length=0,
3488 next_hop_address=cls.pg1.remote_ip6n,
3489 next_hop_sw_if_index=cls.pg1.sw_if_index)
3492 super(TestNAT44Out2InDPO, cls).tearDownClass()
3495 def configure_xlat(self):
3496 self.dst_ip6_pfx = '1:2:3::'
3497 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3499 self.dst_ip6_pfx_len = 96
3500 self.src_ip6_pfx = '4:5:6::'
3501 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3503 self.src_ip6_pfx_len = 96
3504 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3505 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3506 '\x00\x00\x00\x00', 0, is_translation=1,
3509 def test_464xlat_ce(self):
3510 """ Test 464XLAT CE with NAT44 """
3512 self.configure_xlat()
3514 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3515 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3517 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3518 self.dst_ip6_pfx_len)
3519 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3520 self.src_ip6_pfx_len)
3523 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3524 self.pg0.add_stream(pkts)
3525 self.pg_enable_capture(self.pg_interfaces)
3527 capture = self.pg1.get_capture(len(pkts))
3528 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3531 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3533 self.pg1.add_stream(pkts)
3534 self.pg_enable_capture(self.pg_interfaces)
3536 capture = self.pg0.get_capture(len(pkts))
3537 self.verify_capture_in(capture, self.pg0)
3539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3541 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3542 self.nat_addr_n, is_add=0)
3544 def test_464xlat_ce_no_nat(self):
3545 """ Test 464XLAT CE without NAT44 """
3547 self.configure_xlat()
3549 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3550 self.dst_ip6_pfx_len)
3551 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3552 self.src_ip6_pfx_len)
3554 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3555 self.pg0.add_stream(pkts)
3556 self.pg_enable_capture(self.pg_interfaces)
3558 capture = self.pg1.get_capture(len(pkts))
3559 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3560 nat_ip=out_dst_ip6, same_port=True)
3562 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3563 self.pg1.add_stream(pkts)
3564 self.pg_enable_capture(self.pg_interfaces)
3566 capture = self.pg0.get_capture(len(pkts))
3567 self.verify_capture_in(capture, self.pg0)
3570 class TestDeterministicNAT(MethodHolder):
3571 """ Deterministic NAT Test Cases """
3574 def setUpConstants(cls):
3575 super(TestDeterministicNAT, cls).setUpConstants()
3576 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3579 def setUpClass(cls):
3580 super(TestDeterministicNAT, cls).setUpClass()
3583 cls.tcp_port_in = 6303
3584 cls.tcp_external_port = 6303
3585 cls.udp_port_in = 6304
3586 cls.udp_external_port = 6304
3587 cls.icmp_id_in = 6305
3588 cls.nat_addr = '10.0.0.3'
3590 cls.create_pg_interfaces(range(3))
3591 cls.interfaces = list(cls.pg_interfaces)
3593 for i in cls.interfaces:
3598 cls.pg0.generate_remote_hosts(2)
3599 cls.pg0.configure_ipv4_neighbors()
3602 super(TestDeterministicNAT, cls).tearDownClass()
3605 def create_stream_in(self, in_if, out_if, ttl=64):
3607 Create packet stream for inside network
3609 :param in_if: Inside interface
3610 :param out_if: Outside interface
3611 :param ttl: TTL of generated packets
3615 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3616 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3617 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3621 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3622 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3623 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3627 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3628 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3629 ICMP(id=self.icmp_id_in, type='echo-request'))
3634 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3636 Create packet stream for outside network
3638 :param out_if: Outside interface
3639 :param dst_ip: Destination IP address (Default use global NAT address)
3640 :param ttl: TTL of generated packets
3643 dst_ip = self.nat_addr
3646 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3647 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3648 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3652 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3653 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3654 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3658 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3659 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3660 ICMP(id=self.icmp_external_id, type='echo-reply'))
3665 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3667 Verify captured packets on outside network
3669 :param capture: Captured packets
3670 :param nat_ip: Translated IP address (Default use global NAT address)
3671 :param same_port: Sorce port number is not translated (Default False)
3672 :param packet_num: Expected number of packets (Default 3)
3675 nat_ip = self.nat_addr
3676 self.assertEqual(packet_num, len(capture))
3677 for packet in capture:
3679 self.assertEqual(packet[IP].src, nat_ip)
3680 if packet.haslayer(TCP):
3681 self.tcp_port_out = packet[TCP].sport
3682 elif packet.haslayer(UDP):
3683 self.udp_port_out = packet[UDP].sport
3685 self.icmp_external_id = packet[ICMP].id
3687 self.logger.error(ppp("Unexpected or invalid packet "
3688 "(outside network):", packet))
3691 def initiate_tcp_session(self, in_if, out_if):
3693 Initiates TCP session
3695 :param in_if: Inside interface
3696 :param out_if: Outside interface
3699 # SYN packet in->out
3700 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3701 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3702 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3705 self.pg_enable_capture(self.pg_interfaces)
3707 capture = out_if.get_capture(1)
3709 self.tcp_port_out = p[TCP].sport
3711 # SYN + ACK packet out->in
3712 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3713 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3714 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3716 out_if.add_stream(p)
3717 self.pg_enable_capture(self.pg_interfaces)
3719 in_if.get_capture(1)
3721 # ACK packet in->out
3722 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3723 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3724 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3727 self.pg_enable_capture(self.pg_interfaces)
3729 out_if.get_capture(1)
3732 self.logger.error("TCP 3 way handshake failed")
3735 def verify_ipfix_max_entries_per_user(self, data):
3737 Verify IPFIX maximum entries per user exceeded event
3739 :param data: Decoded IPFIX data records
3741 self.assertEqual(1, len(data))
3744 self.assertEqual(ord(record[230]), 13)
3745 # natQuotaExceededEvent
3746 self.assertEqual('\x03\x00\x00\x00', record[466])
3748 self.assertEqual('\xe8\x03\x00\x00', record[473])
3750 self.assertEqual(self.pg0.remote_ip4n, record[8])
3752 def test_deterministic_mode(self):
3753 """ NAT plugin run deterministic mode """
3754 in_addr = '172.16.255.0'
3755 out_addr = '172.17.255.50'
3756 in_addr_t = '172.16.255.20'
3757 in_addr_n = socket.inet_aton(in_addr)
3758 out_addr_n = socket.inet_aton(out_addr)
3759 in_addr_t_n = socket.inet_aton(in_addr_t)
3763 nat_config = self.vapi.nat_show_config()
3764 self.assertEqual(1, nat_config.deterministic)
3766 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3768 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3769 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3770 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3771 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3773 deterministic_mappings = self.vapi.nat_det_map_dump()
3774 self.assertEqual(len(deterministic_mappings), 1)
3775 dsm = deterministic_mappings[0]
3776 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3777 self.assertEqual(in_plen, dsm.in_plen)
3778 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3779 self.assertEqual(out_plen, dsm.out_plen)
3781 self.clear_nat_det()
3782 deterministic_mappings = self.vapi.nat_det_map_dump()
3783 self.assertEqual(len(deterministic_mappings), 0)
3785 def test_set_timeouts(self):
3786 """ Set deterministic NAT timeouts """
3787 timeouts_before = self.vapi.nat_det_get_timeouts()
3789 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3790 timeouts_before.tcp_established + 10,
3791 timeouts_before.tcp_transitory + 10,
3792 timeouts_before.icmp + 10)
3794 timeouts_after = self.vapi.nat_det_get_timeouts()
3796 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3797 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3798 self.assertNotEqual(timeouts_before.tcp_established,
3799 timeouts_after.tcp_established)
3800 self.assertNotEqual(timeouts_before.tcp_transitory,
3801 timeouts_after.tcp_transitory)
3803 def test_det_in(self):
3804 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3806 nat_ip = "10.0.0.10"
3808 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3810 socket.inet_aton(nat_ip),
3812 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3813 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3817 pkts = self.create_stream_in(self.pg0, self.pg1)
3818 self.pg0.add_stream(pkts)
3819 self.pg_enable_capture(self.pg_interfaces)
3821 capture = self.pg1.get_capture(len(pkts))
3822 self.verify_capture_out(capture, nat_ip)
3825 pkts = self.create_stream_out(self.pg1, nat_ip)
3826 self.pg1.add_stream(pkts)
3827 self.pg_enable_capture(self.pg_interfaces)
3829 capture = self.pg0.get_capture(len(pkts))
3830 self.verify_capture_in(capture, self.pg0)
3833 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3834 self.assertEqual(len(sessions), 3)
3838 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3839 self.assertEqual(s.in_port, self.tcp_port_in)
3840 self.assertEqual(s.out_port, self.tcp_port_out)
3841 self.assertEqual(s.ext_port, self.tcp_external_port)
3845 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3846 self.assertEqual(s.in_port, self.udp_port_in)
3847 self.assertEqual(s.out_port, self.udp_port_out)
3848 self.assertEqual(s.ext_port, self.udp_external_port)
3852 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3853 self.assertEqual(s.in_port, self.icmp_id_in)
3854 self.assertEqual(s.out_port, self.icmp_external_id)
3856 def test_multiple_users(self):
3857 """ Deterministic NAT multiple users """
3859 nat_ip = "10.0.0.10"
3861 external_port = 6303
3863 host0 = self.pg0.remote_hosts[0]
3864 host1 = self.pg0.remote_hosts[1]
3866 self.vapi.nat_det_add_del_map(host0.ip4n,
3868 socket.inet_aton(nat_ip),
3870 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3871 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3875 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3876 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3877 TCP(sport=port_in, dport=external_port))
3878 self.pg0.add_stream(p)
3879 self.pg_enable_capture(self.pg_interfaces)
3881 capture = self.pg1.get_capture(1)
3886 self.assertEqual(ip.src, nat_ip)
3887 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3888 self.assertEqual(tcp.dport, external_port)
3889 port_out0 = tcp.sport
3891 self.logger.error(ppp("Unexpected or invalid packet:", p))
3895 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3896 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3897 TCP(sport=port_in, dport=external_port))
3898 self.pg0.add_stream(p)
3899 self.pg_enable_capture(self.pg_interfaces)
3901 capture = self.pg1.get_capture(1)
3906 self.assertEqual(ip.src, nat_ip)
3907 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3908 self.assertEqual(tcp.dport, external_port)
3909 port_out1 = tcp.sport
3911 self.logger.error(ppp("Unexpected or invalid packet:", p))
3914 dms = self.vapi.nat_det_map_dump()
3915 self.assertEqual(1, len(dms))
3916 self.assertEqual(2, dms[0].ses_num)
3919 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3920 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3921 TCP(sport=external_port, dport=port_out0))
3922 self.pg1.add_stream(p)
3923 self.pg_enable_capture(self.pg_interfaces)
3925 capture = self.pg0.get_capture(1)
3930 self.assertEqual(ip.src, self.pg1.remote_ip4)
3931 self.assertEqual(ip.dst, host0.ip4)
3932 self.assertEqual(tcp.dport, port_in)
3933 self.assertEqual(tcp.sport, external_port)
3935 self.logger.error(ppp("Unexpected or invalid packet:", p))
3939 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3940 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3941 TCP(sport=external_port, dport=port_out1))
3942 self.pg1.add_stream(p)
3943 self.pg_enable_capture(self.pg_interfaces)
3945 capture = self.pg0.get_capture(1)
3950 self.assertEqual(ip.src, self.pg1.remote_ip4)
3951 self.assertEqual(ip.dst, host1.ip4)
3952 self.assertEqual(tcp.dport, port_in)
3953 self.assertEqual(tcp.sport, external_port)
3955 self.logger.error(ppp("Unexpected or invalid packet", p))
3958 # session close api test
3959 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3961 self.pg1.remote_ip4n,
3963 dms = self.vapi.nat_det_map_dump()
3964 self.assertEqual(dms[0].ses_num, 1)
3966 self.vapi.nat_det_close_session_in(host0.ip4n,
3968 self.pg1.remote_ip4n,
3970 dms = self.vapi.nat_det_map_dump()
3971 self.assertEqual(dms[0].ses_num, 0)
3973 def test_tcp_session_close_detection_in(self):
3974 """ Deterministic NAT TCP session close from inside network """
3975 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3977 socket.inet_aton(self.nat_addr),
3979 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3980 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3983 self.initiate_tcp_session(self.pg0, self.pg1)
3985 # close the session from inside
3987 # FIN packet in -> out
3988 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3989 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3990 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3992 self.pg0.add_stream(p)
3993 self.pg_enable_capture(self.pg_interfaces)
3995 self.pg1.get_capture(1)
3999 # ACK packet out -> in
4000 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4001 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4002 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4006 # FIN packet out -> in
4007 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4008 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4009 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4013 self.pg1.add_stream(pkts)
4014 self.pg_enable_capture(self.pg_interfaces)
4016 self.pg0.get_capture(2)
4018 # ACK packet in -> out
4019 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4021 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4023 self.pg0.add_stream(p)
4024 self.pg_enable_capture(self.pg_interfaces)
4026 self.pg1.get_capture(1)
4028 # Check if deterministic NAT44 closed the session
4029 dms = self.vapi.nat_det_map_dump()
4030 self.assertEqual(0, dms[0].ses_num)
4032 self.logger.error("TCP session termination failed")
4035 def test_tcp_session_close_detection_out(self):
4036 """ Deterministic NAT TCP session close from outside network """
4037 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4039 socket.inet_aton(self.nat_addr),
4041 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4042 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4045 self.initiate_tcp_session(self.pg0, self.pg1)
4047 # close the session from outside
4049 # FIN packet out -> in
4050 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4051 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4052 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4054 self.pg1.add_stream(p)
4055 self.pg_enable_capture(self.pg_interfaces)
4057 self.pg0.get_capture(1)
4061 # ACK packet in -> out
4062 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4063 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4064 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4068 # ACK packet in -> out
4069 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4070 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4071 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4075 self.pg0.add_stream(pkts)
4076 self.pg_enable_capture(self.pg_interfaces)
4078 self.pg1.get_capture(2)
4080 # ACK packet out -> in
4081 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4082 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4083 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4085 self.pg1.add_stream(p)
4086 self.pg_enable_capture(self.pg_interfaces)
4088 self.pg0.get_capture(1)
4090 # Check if deterministic NAT44 closed the session
4091 dms = self.vapi.nat_det_map_dump()
4092 self.assertEqual(0, dms[0].ses_num)
4094 self.logger.error("TCP session termination failed")
4097 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4098 def test_session_timeout(self):
4099 """ Deterministic NAT session timeouts """
4100 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4102 socket.inet_aton(self.nat_addr),
4104 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4105 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4108 self.initiate_tcp_session(self.pg0, self.pg1)
4109 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4110 pkts = self.create_stream_in(self.pg0, self.pg1)
4111 self.pg0.add_stream(pkts)
4112 self.pg_enable_capture(self.pg_interfaces)
4114 capture = self.pg1.get_capture(len(pkts))
4117 dms = self.vapi.nat_det_map_dump()
4118 self.assertEqual(0, dms[0].ses_num)
4120 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4121 def test_session_limit_per_user(self):
4122 """ Deterministic NAT maximum sessions per user limit """
4123 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4125 socket.inet_aton(self.nat_addr),
4127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4130 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4131 src_address=self.pg2.local_ip4n,
4133 template_interval=10)
4134 self.vapi.nat_ipfix()
4137 for port in range(1025, 2025):
4138 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4139 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4140 UDP(sport=port, dport=port))
4143 self.pg0.add_stream(pkts)
4144 self.pg_enable_capture(self.pg_interfaces)
4146 capture = self.pg1.get_capture(len(pkts))
4148 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4149 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4150 UDP(sport=3001, dport=3002))
4151 self.pg0.add_stream(p)
4152 self.pg_enable_capture(self.pg_interfaces)
4154 capture = self.pg1.assert_nothing_captured()
4156 # verify ICMP error packet
4157 capture = self.pg0.get_capture(1)
4159 self.assertTrue(p.haslayer(ICMP))
4161 self.assertEqual(icmp.type, 3)
4162 self.assertEqual(icmp.code, 1)
4163 self.assertTrue(icmp.haslayer(IPerror))
4164 inner_ip = icmp[IPerror]
4165 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4166 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4168 dms = self.vapi.nat_det_map_dump()
4170 self.assertEqual(1000, dms[0].ses_num)
4172 # verify IPFIX logging
4173 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4175 capture = self.pg2.get_capture(2)
4176 ipfix = IPFIXDecoder()
4177 # first load template
4179 self.assertTrue(p.haslayer(IPFIX))
4180 if p.haslayer(Template):
4181 ipfix.add_template(p.getlayer(Template))
4182 # verify events in data set
4184 if p.haslayer(Data):
4185 data = ipfix.decode_data_set(p.getlayer(Set))
4186 self.verify_ipfix_max_entries_per_user(data)
4188 def clear_nat_det(self):
4190 Clear deterministic NAT configuration.
4192 self.vapi.nat_ipfix(enable=0)
4193 self.vapi.nat_det_set_timeouts()
4194 deterministic_mappings = self.vapi.nat_det_map_dump()
4195 for dsm in deterministic_mappings:
4196 self.vapi.nat_det_add_del_map(dsm.in_addr,
4202 interfaces = self.vapi.nat44_interface_dump()
4203 for intf in interfaces:
4204 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4209 super(TestDeterministicNAT, self).tearDown()
4210 if not self.vpp_dead:
4211 self.logger.info(self.vapi.cli("show nat44 detail"))
4212 self.clear_nat_det()
4215 class TestNAT64(MethodHolder):
4216 """ NAT64 Test Cases """
4219 def setUpConstants(cls):
4220 super(TestNAT64, cls).setUpConstants()
4221 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4222 "nat64 st hash buckets 256", "}"])
4225 def setUpClass(cls):
4226 super(TestNAT64, cls).setUpClass()
4229 cls.tcp_port_in = 6303
4230 cls.tcp_port_out = 6303
4231 cls.udp_port_in = 6304
4232 cls.udp_port_out = 6304
4233 cls.icmp_id_in = 6305
4234 cls.icmp_id_out = 6305
4235 cls.nat_addr = '10.0.0.3'
4236 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4238 cls.vrf1_nat_addr = '10.0.10.3'
4239 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4241 cls.ipfix_src_port = 4739
4242 cls.ipfix_domain_id = 1
4244 cls.create_pg_interfaces(range(5))
4245 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4246 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4247 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4249 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4251 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4253 cls.pg0.generate_remote_hosts(2)
4255 for i in cls.ip6_interfaces:
4258 i.configure_ipv6_neighbors()
4260 for i in cls.ip4_interfaces:
4266 cls.pg3.config_ip4()
4267 cls.pg3.resolve_arp()
4268 cls.pg3.config_ip6()
4269 cls.pg3.configure_ipv6_neighbors()
4272 super(TestNAT64, cls).tearDownClass()
4275 def test_pool(self):
4276 """ Add/delete address to NAT64 pool """
4277 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4279 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4281 addresses = self.vapi.nat64_pool_addr_dump()
4282 self.assertEqual(len(addresses), 1)
4283 self.assertEqual(addresses[0].address, nat_addr)
4285 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4287 addresses = self.vapi.nat64_pool_addr_dump()
4288 self.assertEqual(len(addresses), 0)
4290 def test_interface(self):
4291 """ Enable/disable NAT64 feature on the interface """
4292 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4293 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4295 interfaces = self.vapi.nat64_interface_dump()
4296 self.assertEqual(len(interfaces), 2)
4299 for intf in interfaces:
4300 if intf.sw_if_index == self.pg0.sw_if_index:
4301 self.assertEqual(intf.is_inside, 1)
4303 elif intf.sw_if_index == self.pg1.sw_if_index:
4304 self.assertEqual(intf.is_inside, 0)
4306 self.assertTrue(pg0_found)
4307 self.assertTrue(pg1_found)
4309 features = self.vapi.cli("show interface features pg0")
4310 self.assertNotEqual(features.find('nat64-in2out'), -1)
4311 features = self.vapi.cli("show interface features pg1")
4312 self.assertNotEqual(features.find('nat64-out2in'), -1)
4314 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4315 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4317 interfaces = self.vapi.nat64_interface_dump()
4318 self.assertEqual(len(interfaces), 0)
4320 def test_static_bib(self):
4321 """ Add/delete static BIB entry """
4322 in_addr = socket.inet_pton(socket.AF_INET6,
4323 '2001:db8:85a3::8a2e:370:7334')
4324 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4327 proto = IP_PROTOS.tcp
4329 self.vapi.nat64_add_del_static_bib(in_addr,
4334 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4339 self.assertEqual(bibe.i_addr, in_addr)
4340 self.assertEqual(bibe.o_addr, out_addr)
4341 self.assertEqual(bibe.i_port, in_port)
4342 self.assertEqual(bibe.o_port, out_port)
4343 self.assertEqual(static_bib_num, 1)
4345 self.vapi.nat64_add_del_static_bib(in_addr,
4351 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4356 self.assertEqual(static_bib_num, 0)
4358 def test_set_timeouts(self):
4359 """ Set NAT64 timeouts """
4360 # verify default values
4361 timeouts = self.vapi.nat64_get_timeouts()
4362 self.assertEqual(timeouts.udp, 300)
4363 self.assertEqual(timeouts.icmp, 60)
4364 self.assertEqual(timeouts.tcp_trans, 240)
4365 self.assertEqual(timeouts.tcp_est, 7440)
4366 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4368 # set and verify custom values
4369 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4370 tcp_est=7450, tcp_incoming_syn=10)
4371 timeouts = self.vapi.nat64_get_timeouts()
4372 self.assertEqual(timeouts.udp, 200)
4373 self.assertEqual(timeouts.icmp, 30)
4374 self.assertEqual(timeouts.tcp_trans, 250)
4375 self.assertEqual(timeouts.tcp_est, 7450)
4376 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4378 def test_dynamic(self):
4379 """ NAT64 dynamic translation test """
4380 self.tcp_port_in = 6303
4381 self.udp_port_in = 6304
4382 self.icmp_id_in = 6305
4384 ses_num_start = self.nat64_get_ses_num()
4386 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4388 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4389 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4392 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4393 self.pg0.add_stream(pkts)
4394 self.pg_enable_capture(self.pg_interfaces)
4396 capture = self.pg1.get_capture(len(pkts))
4397 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4398 dst_ip=self.pg1.remote_ip4)
4401 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4402 self.pg1.add_stream(pkts)
4403 self.pg_enable_capture(self.pg_interfaces)
4405 capture = self.pg0.get_capture(len(pkts))
4406 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4407 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4410 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4411 self.pg0.add_stream(pkts)
4412 self.pg_enable_capture(self.pg_interfaces)
4414 capture = self.pg1.get_capture(len(pkts))
4415 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4416 dst_ip=self.pg1.remote_ip4)
4419 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4420 self.pg1.add_stream(pkts)
4421 self.pg_enable_capture(self.pg_interfaces)
4423 capture = self.pg0.get_capture(len(pkts))
4424 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4426 ses_num_end = self.nat64_get_ses_num()
4428 self.assertEqual(ses_num_end - ses_num_start, 3)
4430 # tenant with specific VRF
4431 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4432 self.vrf1_nat_addr_n,
4433 vrf_id=self.vrf1_id)
4434 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4436 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4437 self.pg2.add_stream(pkts)
4438 self.pg_enable_capture(self.pg_interfaces)
4440 capture = self.pg1.get_capture(len(pkts))
4441 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4442 dst_ip=self.pg1.remote_ip4)
4444 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4445 self.pg1.add_stream(pkts)
4446 self.pg_enable_capture(self.pg_interfaces)
4448 capture = self.pg2.get_capture(len(pkts))
4449 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4451 def test_static(self):
4452 """ NAT64 static translation test """
4453 self.tcp_port_in = 60303
4454 self.udp_port_in = 60304
4455 self.icmp_id_in = 60305
4456 self.tcp_port_out = 60303
4457 self.udp_port_out = 60304
4458 self.icmp_id_out = 60305
4460 ses_num_start = self.nat64_get_ses_num()
4462 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4464 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4465 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4467 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4472 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4477 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4484 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4485 self.pg0.add_stream(pkts)
4486 self.pg_enable_capture(self.pg_interfaces)
4488 capture = self.pg1.get_capture(len(pkts))
4489 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4490 dst_ip=self.pg1.remote_ip4, same_port=True)
4493 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4494 self.pg1.add_stream(pkts)
4495 self.pg_enable_capture(self.pg_interfaces)
4497 capture = self.pg0.get_capture(len(pkts))
4498 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4499 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4501 ses_num_end = self.nat64_get_ses_num()
4503 self.assertEqual(ses_num_end - ses_num_start, 3)
4505 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4506 def test_session_timeout(self):
4507 """ NAT64 session timeout """
4508 self.icmp_id_in = 1234
4509 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4511 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4512 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4513 self.vapi.nat64_set_timeouts(icmp=5)
4515 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4516 self.pg0.add_stream(pkts)
4517 self.pg_enable_capture(self.pg_interfaces)
4519 capture = self.pg1.get_capture(len(pkts))
4521 ses_num_before_timeout = self.nat64_get_ses_num()
4525 # ICMP session after timeout
4526 ses_num_after_timeout = self.nat64_get_ses_num()
4527 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4529 def test_icmp_error(self):
4530 """ NAT64 ICMP Error message translation """
4531 self.tcp_port_in = 6303
4532 self.udp_port_in = 6304
4533 self.icmp_id_in = 6305
4535 ses_num_start = self.nat64_get_ses_num()
4537 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4539 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4540 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4542 # send some packets to create sessions
4543 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4544 self.pg0.add_stream(pkts)
4545 self.pg_enable_capture(self.pg_interfaces)
4547 capture_ip4 = self.pg1.get_capture(len(pkts))
4548 self.verify_capture_out(capture_ip4,
4549 nat_ip=self.nat_addr,
4550 dst_ip=self.pg1.remote_ip4)
4552 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4553 self.pg1.add_stream(pkts)
4554 self.pg_enable_capture(self.pg_interfaces)
4556 capture_ip6 = self.pg0.get_capture(len(pkts))
4557 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4558 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4559 self.pg0.remote_ip6)
4562 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4563 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4564 ICMPv6DestUnreach(code=1) /
4565 packet[IPv6] for packet in capture_ip6]
4566 self.pg0.add_stream(pkts)
4567 self.pg_enable_capture(self.pg_interfaces)
4569 capture = self.pg1.get_capture(len(pkts))
4570 for packet in capture:
4572 self.assertEqual(packet[IP].src, self.nat_addr)
4573 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4574 self.assertEqual(packet[ICMP].type, 3)
4575 self.assertEqual(packet[ICMP].code, 13)
4576 inner = packet[IPerror]
4577 self.assertEqual(inner.src, self.pg1.remote_ip4)
4578 self.assertEqual(inner.dst, self.nat_addr)
4579 self.check_icmp_checksum(packet)
4580 if inner.haslayer(TCPerror):
4581 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4582 elif inner.haslayer(UDPerror):
4583 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4585 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4587 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4591 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4592 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4593 ICMP(type=3, code=13) /
4594 packet[IP] for packet in capture_ip4]
4595 self.pg1.add_stream(pkts)
4596 self.pg_enable_capture(self.pg_interfaces)
4598 capture = self.pg0.get_capture(len(pkts))
4599 for packet in capture:
4601 self.assertEqual(packet[IPv6].src, ip.src)
4602 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4603 icmp = packet[ICMPv6DestUnreach]
4604 self.assertEqual(icmp.code, 1)
4605 inner = icmp[IPerror6]
4606 self.assertEqual(inner.src, self.pg0.remote_ip6)
4607 self.assertEqual(inner.dst, ip.src)
4608 self.check_icmpv6_checksum(packet)
4609 if inner.haslayer(TCPerror):
4610 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4611 elif inner.haslayer(UDPerror):
4612 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4614 self.assertEqual(inner[ICMPv6EchoRequest].id,
4617 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4620 def test_hairpinning(self):
4621 """ NAT64 hairpinning """
4623 client = self.pg0.remote_hosts[0]
4624 server = self.pg0.remote_hosts[1]
4625 server_tcp_in_port = 22
4626 server_tcp_out_port = 4022
4627 server_udp_in_port = 23
4628 server_udp_out_port = 4023
4629 client_tcp_in_port = 1234
4630 client_udp_in_port = 1235
4631 client_tcp_out_port = 0
4632 client_udp_out_port = 0
4633 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4634 nat_addr_ip6 = ip.src
4636 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4638 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4639 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4641 self.vapi.nat64_add_del_static_bib(server.ip6n,
4644 server_tcp_out_port,
4646 self.vapi.nat64_add_del_static_bib(server.ip6n,
4649 server_udp_out_port,
4654 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4655 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4656 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4658 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4659 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4660 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4662 self.pg0.add_stream(pkts)
4663 self.pg_enable_capture(self.pg_interfaces)
4665 capture = self.pg0.get_capture(len(pkts))
4666 for packet in capture:
4668 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4669 self.assertEqual(packet[IPv6].dst, server.ip6)
4670 if packet.haslayer(TCP):
4671 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4672 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4673 self.check_tcp_checksum(packet)
4674 client_tcp_out_port = packet[TCP].sport
4676 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4677 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4678 self.check_udp_checksum(packet)
4679 client_udp_out_port = packet[UDP].sport
4681 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4687 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4688 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4690 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4691 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4692 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4694 self.pg0.add_stream(pkts)
4695 self.pg_enable_capture(self.pg_interfaces)
4697 capture = self.pg0.get_capture(len(pkts))
4698 for packet in capture:
4700 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4701 self.assertEqual(packet[IPv6].dst, client.ip6)
4702 if packet.haslayer(TCP):
4703 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4704 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4705 self.check_tcp_checksum(packet)
4707 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4708 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4709 self.check_udp_checksum(packet)
4711 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4716 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4717 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4718 ICMPv6DestUnreach(code=1) /
4719 packet[IPv6] for packet in capture]
4720 self.pg0.add_stream(pkts)
4721 self.pg_enable_capture(self.pg_interfaces)
4723 capture = self.pg0.get_capture(len(pkts))
4724 for packet in capture:
4726 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4727 self.assertEqual(packet[IPv6].dst, server.ip6)
4728 icmp = packet[ICMPv6DestUnreach]
4729 self.assertEqual(icmp.code, 1)
4730 inner = icmp[IPerror6]
4731 self.assertEqual(inner.src, server.ip6)
4732 self.assertEqual(inner.dst, nat_addr_ip6)
4733 self.check_icmpv6_checksum(packet)
4734 if inner.haslayer(TCPerror):
4735 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4736 self.assertEqual(inner[TCPerror].dport,
4737 client_tcp_out_port)
4739 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4740 self.assertEqual(inner[UDPerror].dport,
4741 client_udp_out_port)
4743 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4746 def test_prefix(self):
4747 """ NAT64 Network-Specific Prefix """
4749 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4751 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4752 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4753 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4754 self.vrf1_nat_addr_n,
4755 vrf_id=self.vrf1_id)
4756 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4759 global_pref64 = "2001:db8::"
4760 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4761 global_pref64_len = 32
4762 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4764 prefix = self.vapi.nat64_prefix_dump()
4765 self.assertEqual(len(prefix), 1)
4766 self.assertEqual(prefix[0].prefix, global_pref64_n)
4767 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4768 self.assertEqual(prefix[0].vrf_id, 0)
4770 # Add tenant specific prefix
4771 vrf1_pref64 = "2001:db8:122:300::"
4772 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4773 vrf1_pref64_len = 56
4774 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4776 vrf_id=self.vrf1_id)
4777 prefix = self.vapi.nat64_prefix_dump()
4778 self.assertEqual(len(prefix), 2)
4781 pkts = self.create_stream_in_ip6(self.pg0,
4784 plen=global_pref64_len)
4785 self.pg0.add_stream(pkts)
4786 self.pg_enable_capture(self.pg_interfaces)
4788 capture = self.pg1.get_capture(len(pkts))
4789 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4790 dst_ip=self.pg1.remote_ip4)
4792 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4793 self.pg1.add_stream(pkts)
4794 self.pg_enable_capture(self.pg_interfaces)
4796 capture = self.pg0.get_capture(len(pkts))
4797 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4800 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4802 # Tenant specific prefix
4803 pkts = self.create_stream_in_ip6(self.pg2,
4806 plen=vrf1_pref64_len)
4807 self.pg2.add_stream(pkts)
4808 self.pg_enable_capture(self.pg_interfaces)
4810 capture = self.pg1.get_capture(len(pkts))
4811 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4812 dst_ip=self.pg1.remote_ip4)
4814 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4815 self.pg1.add_stream(pkts)
4816 self.pg_enable_capture(self.pg_interfaces)
4818 capture = self.pg2.get_capture(len(pkts))
4819 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4822 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4824 def test_unknown_proto(self):
4825 """ NAT64 translate packet with unknown protocol """
4827 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4829 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4830 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4831 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4834 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4835 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4836 TCP(sport=self.tcp_port_in, dport=20))
4837 self.pg0.add_stream(p)
4838 self.pg_enable_capture(self.pg_interfaces)
4840 p = self.pg1.get_capture(1)
4842 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4843 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4845 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4846 TCP(sport=1234, dport=1234))
4847 self.pg0.add_stream(p)
4848 self.pg_enable_capture(self.pg_interfaces)
4850 p = self.pg1.get_capture(1)
4853 self.assertEqual(packet[IP].src, self.nat_addr)
4854 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4855 self.assertTrue(packet.haslayer(GRE))
4856 self.check_ip_checksum(packet)
4858 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4862 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4863 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4865 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4866 TCP(sport=1234, dport=1234))
4867 self.pg1.add_stream(p)
4868 self.pg_enable_capture(self.pg_interfaces)
4870 p = self.pg0.get_capture(1)
4873 self.assertEqual(packet[IPv6].src, remote_ip6)
4874 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4875 self.assertEqual(packet[IPv6].nh, 47)
4877 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4880 def test_hairpinning_unknown_proto(self):
4881 """ NAT64 translate packet with unknown protocol - hairpinning """
4883 client = self.pg0.remote_hosts[0]
4884 server = self.pg0.remote_hosts[1]
4885 server_tcp_in_port = 22
4886 server_tcp_out_port = 4022
4887 client_tcp_in_port = 1234
4888 client_tcp_out_port = 1235
4889 server_nat_ip = "10.0.0.100"
4890 client_nat_ip = "10.0.0.110"
4891 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4892 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4893 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4894 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4896 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4898 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4899 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4901 self.vapi.nat64_add_del_static_bib(server.ip6n,
4904 server_tcp_out_port,
4907 self.vapi.nat64_add_del_static_bib(server.ip6n,
4913 self.vapi.nat64_add_del_static_bib(client.ip6n,
4916 client_tcp_out_port,
4920 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4921 IPv6(src=client.ip6, dst=server_nat_ip6) /
4922 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4923 self.pg0.add_stream(p)
4924 self.pg_enable_capture(self.pg_interfaces)
4926 p = self.pg0.get_capture(1)
4928 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4929 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4931 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4932 TCP(sport=1234, dport=1234))
4933 self.pg0.add_stream(p)
4934 self.pg_enable_capture(self.pg_interfaces)
4936 p = self.pg0.get_capture(1)
4939 self.assertEqual(packet[IPv6].src, client_nat_ip6)
4940 self.assertEqual(packet[IPv6].dst, server.ip6)
4941 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4943 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4948 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4950 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4951 TCP(sport=1234, dport=1234))
4952 self.pg0.add_stream(p)
4953 self.pg_enable_capture(self.pg_interfaces)
4955 p = self.pg0.get_capture(1)
4958 self.assertEqual(packet[IPv6].src, server_nat_ip6)
4959 self.assertEqual(packet[IPv6].dst, client.ip6)
4960 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4962 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4965 def test_one_armed_nat64(self):
4966 """ One armed NAT64 """
4968 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4972 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4974 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4975 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4978 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4979 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4980 TCP(sport=12345, dport=80))
4981 self.pg3.add_stream(p)
4982 self.pg_enable_capture(self.pg_interfaces)
4984 capture = self.pg3.get_capture(1)
4989 self.assertEqual(ip.src, self.nat_addr)
4990 self.assertEqual(ip.dst, self.pg3.remote_ip4)
4991 self.assertNotEqual(tcp.sport, 12345)
4992 external_port = tcp.sport
4993 self.assertEqual(tcp.dport, 80)
4994 self.check_tcp_checksum(p)
4995 self.check_ip_checksum(p)
4997 self.logger.error(ppp("Unexpected or invalid packet:", p))
5001 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5002 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5003 TCP(sport=80, dport=external_port))
5004 self.pg3.add_stream(p)
5005 self.pg_enable_capture(self.pg_interfaces)
5007 capture = self.pg3.get_capture(1)
5012 self.assertEqual(ip.src, remote_host_ip6)
5013 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5014 self.assertEqual(tcp.sport, 80)
5015 self.assertEqual(tcp.dport, 12345)
5016 self.check_tcp_checksum(p)
5018 self.logger.error(ppp("Unexpected or invalid packet:", p))
5021 def test_frag_in_order(self):
5022 """ NAT64 translate fragments arriving in order """
5023 self.tcp_port_in = random.randint(1025, 65535)
5025 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5027 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5028 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5030 reass = self.vapi.nat_reass_dump()
5031 reass_n_start = len(reass)
5035 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5036 self.tcp_port_in, 20, data)
5037 self.pg0.add_stream(pkts)
5038 self.pg_enable_capture(self.pg_interfaces)
5040 frags = self.pg1.get_capture(len(pkts))
5041 p = self.reass_frags_and_verify(frags,
5043 self.pg1.remote_ip4)
5044 self.assertEqual(p[TCP].dport, 20)
5045 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5046 self.tcp_port_out = p[TCP].sport
5047 self.assertEqual(data, p[Raw].load)
5050 data = "A" * 4 + "b" * 16 + "C" * 3
5051 pkts = self.create_stream_frag(self.pg1,
5056 self.pg1.add_stream(pkts)
5057 self.pg_enable_capture(self.pg_interfaces)
5059 frags = self.pg0.get_capture(len(pkts))
5060 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5061 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5062 self.assertEqual(p[TCP].sport, 20)
5063 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5064 self.assertEqual(data, p[Raw].load)
5066 reass = self.vapi.nat_reass_dump()
5067 reass_n_end = len(reass)
5069 self.assertEqual(reass_n_end - reass_n_start, 2)
5071 def test_reass_hairpinning(self):
5072 """ NAT64 fragments hairpinning """
5074 client = self.pg0.remote_hosts[0]
5075 server = self.pg0.remote_hosts[1]
5076 server_in_port = random.randint(1025, 65535)
5077 server_out_port = random.randint(1025, 65535)
5078 client_in_port = random.randint(1025, 65535)
5079 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5080 nat_addr_ip6 = ip.src
5082 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5084 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5085 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5087 # add static BIB entry for server
5088 self.vapi.nat64_add_del_static_bib(server.ip6n,
5094 # send packet from host to server
5095 pkts = self.create_stream_frag_ip6(self.pg0,
5100 self.pg0.add_stream(pkts)
5101 self.pg_enable_capture(self.pg_interfaces)
5103 frags = self.pg0.get_capture(len(pkts))
5104 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5105 self.assertNotEqual(p[TCP].sport, client_in_port)
5106 self.assertEqual(p[TCP].dport, server_in_port)
5107 self.assertEqual(data, p[Raw].load)
5109 def test_frag_out_of_order(self):
5110 """ NAT64 translate fragments arriving out of order """
5111 self.tcp_port_in = random.randint(1025, 65535)
5113 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5115 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5116 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5120 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5121 self.tcp_port_in, 20, data)
5123 self.pg0.add_stream(pkts)
5124 self.pg_enable_capture(self.pg_interfaces)
5126 frags = self.pg1.get_capture(len(pkts))
5127 p = self.reass_frags_and_verify(frags,
5129 self.pg1.remote_ip4)
5130 self.assertEqual(p[TCP].dport, 20)
5131 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5132 self.tcp_port_out = p[TCP].sport
5133 self.assertEqual(data, p[Raw].load)
5136 data = "A" * 4 + "B" * 16 + "C" * 3
5137 pkts = self.create_stream_frag(self.pg1,
5143 self.pg1.add_stream(pkts)
5144 self.pg_enable_capture(self.pg_interfaces)
5146 frags = self.pg0.get_capture(len(pkts))
5147 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5148 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5149 self.assertEqual(p[TCP].sport, 20)
5150 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5151 self.assertEqual(data, p[Raw].load)
5153 def test_interface_addr(self):
5154 """ Acquire NAT64 pool addresses from interface """
5155 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5157 # no address in NAT64 pool
5158 adresses = self.vapi.nat44_address_dump()
5159 self.assertEqual(0, len(adresses))
5161 # configure interface address and check NAT64 address pool
5162 self.pg4.config_ip4()
5163 addresses = self.vapi.nat64_pool_addr_dump()
5164 self.assertEqual(len(addresses), 1)
5165 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5167 # remove interface address and check NAT64 address pool
5168 self.pg4.unconfig_ip4()
5169 addresses = self.vapi.nat64_pool_addr_dump()
5170 self.assertEqual(0, len(adresses))
5172 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5173 def test_ipfix_max_bibs_sessions(self):
5174 """ IPFIX logging maximum session and BIB entries exceeded """
5177 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5181 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5183 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5184 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5188 for i in range(0, max_bibs):
5189 src = "fd01:aa::%x" % (i)
5190 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5191 IPv6(src=src, dst=remote_host_ip6) /
5192 TCP(sport=12345, dport=80))
5194 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5195 IPv6(src=src, dst=remote_host_ip6) /
5196 TCP(sport=12345, dport=22))
5198 self.pg0.add_stream(pkts)
5199 self.pg_enable_capture(self.pg_interfaces)
5201 self.pg1.get_capture(max_sessions)
5203 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5204 src_address=self.pg3.local_ip4n,
5206 template_interval=10)
5207 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5208 src_port=self.ipfix_src_port)
5210 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5211 IPv6(src=src, dst=remote_host_ip6) /
5212 TCP(sport=12345, dport=25))
5213 self.pg0.add_stream(p)
5214 self.pg_enable_capture(self.pg_interfaces)
5216 self.pg1.get_capture(0)
5217 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5218 capture = self.pg3.get_capture(9)
5219 ipfix = IPFIXDecoder()
5220 # first load template
5222 self.assertTrue(p.haslayer(IPFIX))
5223 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5224 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5225 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5226 self.assertEqual(p[UDP].dport, 4739)
5227 self.assertEqual(p[IPFIX].observationDomainID,
5228 self.ipfix_domain_id)
5229 if p.haslayer(Template):
5230 ipfix.add_template(p.getlayer(Template))
5231 # verify events in data set
5233 if p.haslayer(Data):
5234 data = ipfix.decode_data_set(p.getlayer(Set))
5235 self.verify_ipfix_max_sessions(data, max_sessions)
5237 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5238 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5239 TCP(sport=12345, dport=80))
5240 self.pg0.add_stream(p)
5241 self.pg_enable_capture(self.pg_interfaces)
5243 self.pg1.get_capture(0)
5244 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5245 capture = self.pg3.get_capture(1)
5246 # verify events in data set
5248 self.assertTrue(p.haslayer(IPFIX))
5249 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5250 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5251 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5252 self.assertEqual(p[UDP].dport, 4739)
5253 self.assertEqual(p[IPFIX].observationDomainID,
5254 self.ipfix_domain_id)
5255 if p.haslayer(Data):
5256 data = ipfix.decode_data_set(p.getlayer(Set))
5257 self.verify_ipfix_max_bibs(data, max_bibs)
5259 def test_ipfix_max_frags(self):
5260 """ IPFIX logging maximum fragments pending reassembly exceeded """
5261 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5263 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5264 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5265 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5266 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5267 src_address=self.pg3.local_ip4n,
5269 template_interval=10)
5270 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5271 src_port=self.ipfix_src_port)
5274 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5275 self.tcp_port_in, 20, data)
5276 self.pg0.add_stream(pkts[-1])
5277 self.pg_enable_capture(self.pg_interfaces)
5279 self.pg1.get_capture(0)
5280 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5281 capture = self.pg3.get_capture(9)
5282 ipfix = IPFIXDecoder()
5283 # first load template
5285 self.assertTrue(p.haslayer(IPFIX))
5286 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5287 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5288 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5289 self.assertEqual(p[UDP].dport, 4739)
5290 self.assertEqual(p[IPFIX].observationDomainID,
5291 self.ipfix_domain_id)
5292 if p.haslayer(Template):
5293 ipfix.add_template(p.getlayer(Template))
5294 # verify events in data set
5296 if p.haslayer(Data):
5297 data = ipfix.decode_data_set(p.getlayer(Set))
5298 self.verify_ipfix_max_fragments_ip6(data, 0,
5299 self.pg0.remote_ip6n)
5301 def test_ipfix_bib_ses(self):
5302 """ IPFIX logging NAT64 BIB/session create and delete events """
5303 self.tcp_port_in = random.randint(1025, 65535)
5304 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5308 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5310 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5311 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5312 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5313 src_address=self.pg3.local_ip4n,
5315 template_interval=10)
5316 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5317 src_port=self.ipfix_src_port)
5320 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5321 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5322 TCP(sport=self.tcp_port_in, dport=25))
5323 self.pg0.add_stream(p)
5324 self.pg_enable_capture(self.pg_interfaces)
5326 p = self.pg1.get_capture(1)
5327 self.tcp_port_out = p[0][TCP].sport
5328 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5329 capture = self.pg3.get_capture(10)
5330 ipfix = IPFIXDecoder()
5331 # first load template
5333 self.assertTrue(p.haslayer(IPFIX))
5334 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5335 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5336 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5337 self.assertEqual(p[UDP].dport, 4739)
5338 self.assertEqual(p[IPFIX].observationDomainID,
5339 self.ipfix_domain_id)
5340 if p.haslayer(Template):
5341 ipfix.add_template(p.getlayer(Template))
5342 # verify events in data set
5344 if p.haslayer(Data):
5345 data = ipfix.decode_data_set(p.getlayer(Set))
5346 if ord(data[0][230]) == 10:
5347 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5348 elif ord(data[0][230]) == 6:
5349 self.verify_ipfix_nat64_ses(data,
5351 self.pg0.remote_ip6n,
5352 self.pg1.remote_ip4,
5355 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5358 self.pg_enable_capture(self.pg_interfaces)
5359 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5362 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5363 capture = self.pg3.get_capture(2)
5364 # verify events in data set
5366 self.assertTrue(p.haslayer(IPFIX))
5367 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5368 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5369 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5370 self.assertEqual(p[UDP].dport, 4739)
5371 self.assertEqual(p[IPFIX].observationDomainID,
5372 self.ipfix_domain_id)
5373 if p.haslayer(Data):
5374 data = ipfix.decode_data_set(p.getlayer(Set))
5375 if ord(data[0][230]) == 11:
5376 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5377 elif ord(data[0][230]) == 7:
5378 self.verify_ipfix_nat64_ses(data,
5380 self.pg0.remote_ip6n,
5381 self.pg1.remote_ip4,
5384 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5386 def nat64_get_ses_num(self):
5388 Return number of active NAT64 sessions.
5390 st = self.vapi.nat64_st_dump()
5393 def clear_nat64(self):
5395 Clear NAT64 configuration.
5397 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5398 domain_id=self.ipfix_domain_id)
5399 self.ipfix_src_port = 4739
5400 self.ipfix_domain_id = 1
5402 self.vapi.nat64_set_timeouts()
5404 interfaces = self.vapi.nat64_interface_dump()
5405 for intf in interfaces:
5406 if intf.is_inside > 1:
5407 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5410 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5414 bib = self.vapi.nat64_bib_dump(255)
5417 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5425 adresses = self.vapi.nat64_pool_addr_dump()
5426 for addr in adresses:
5427 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5432 prefixes = self.vapi.nat64_prefix_dump()
5433 for prefix in prefixes:
5434 self.vapi.nat64_add_del_prefix(prefix.prefix,
5436 vrf_id=prefix.vrf_id,
5440 super(TestNAT64, self).tearDown()
5441 if not self.vpp_dead:
5442 self.logger.info(self.vapi.cli("show nat64 pool"))
5443 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5444 self.logger.info(self.vapi.cli("show nat64 prefix"))
5445 self.logger.info(self.vapi.cli("show nat64 bib all"))
5446 self.logger.info(self.vapi.cli("show nat64 session table all"))
5447 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5451 class TestDSlite(MethodHolder):
5452 """ DS-Lite Test Cases """
5455 def setUpClass(cls):
5456 super(TestDSlite, cls).setUpClass()
5459 cls.nat_addr = '10.0.0.3'
5460 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5462 cls.create_pg_interfaces(range(2))
5464 cls.pg0.config_ip4()
5465 cls.pg0.resolve_arp()
5467 cls.pg1.config_ip6()
5468 cls.pg1.generate_remote_hosts(2)
5469 cls.pg1.configure_ipv6_neighbors()
5472 super(TestDSlite, cls).tearDownClass()
5475 def test_dslite(self):
5476 """ Test DS-Lite """
5477 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5479 aftr_ip4 = '192.0.0.1'
5480 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5481 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5482 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5483 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5486 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5487 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5488 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5489 UDP(sport=20000, dport=10000))
5490 self.pg1.add_stream(p)
5491 self.pg_enable_capture(self.pg_interfaces)
5493 capture = self.pg0.get_capture(1)
5494 capture = capture[0]
5495 self.assertFalse(capture.haslayer(IPv6))
5496 self.assertEqual(capture[IP].src, self.nat_addr)
5497 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5498 self.assertNotEqual(capture[UDP].sport, 20000)
5499 self.assertEqual(capture[UDP].dport, 10000)
5500 self.check_ip_checksum(capture)
5501 out_port = capture[UDP].sport
5503 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5504 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5505 UDP(sport=10000, dport=out_port))
5506 self.pg0.add_stream(p)
5507 self.pg_enable_capture(self.pg_interfaces)
5509 capture = self.pg1.get_capture(1)
5510 capture = capture[0]
5511 self.assertEqual(capture[IPv6].src, aftr_ip6)
5512 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5513 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5514 self.assertEqual(capture[IP].dst, '192.168.1.1')
5515 self.assertEqual(capture[UDP].sport, 10000)
5516 self.assertEqual(capture[UDP].dport, 20000)
5517 self.check_ip_checksum(capture)
5520 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5521 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5522 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5523 TCP(sport=20001, dport=10001))
5524 self.pg1.add_stream(p)
5525 self.pg_enable_capture(self.pg_interfaces)
5527 capture = self.pg0.get_capture(1)
5528 capture = capture[0]
5529 self.assertFalse(capture.haslayer(IPv6))
5530 self.assertEqual(capture[IP].src, self.nat_addr)
5531 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5532 self.assertNotEqual(capture[TCP].sport, 20001)
5533 self.assertEqual(capture[TCP].dport, 10001)
5534 self.check_ip_checksum(capture)
5535 self.check_tcp_checksum(capture)
5536 out_port = capture[TCP].sport
5538 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5539 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5540 TCP(sport=10001, dport=out_port))
5541 self.pg0.add_stream(p)
5542 self.pg_enable_capture(self.pg_interfaces)
5544 capture = self.pg1.get_capture(1)
5545 capture = capture[0]
5546 self.assertEqual(capture[IPv6].src, aftr_ip6)
5547 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5548 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5549 self.assertEqual(capture[IP].dst, '192.168.1.1')
5550 self.assertEqual(capture[TCP].sport, 10001)
5551 self.assertEqual(capture[TCP].dport, 20001)
5552 self.check_ip_checksum(capture)
5553 self.check_tcp_checksum(capture)
5556 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5557 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5558 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5559 ICMP(id=4000, type='echo-request'))
5560 self.pg1.add_stream(p)
5561 self.pg_enable_capture(self.pg_interfaces)
5563 capture = self.pg0.get_capture(1)
5564 capture = capture[0]
5565 self.assertFalse(capture.haslayer(IPv6))
5566 self.assertEqual(capture[IP].src, self.nat_addr)
5567 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5568 self.assertNotEqual(capture[ICMP].id, 4000)
5569 self.check_ip_checksum(capture)
5570 self.check_icmp_checksum(capture)
5571 out_id = capture[ICMP].id
5573 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5574 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5575 ICMP(id=out_id, type='echo-reply'))
5576 self.pg0.add_stream(p)
5577 self.pg_enable_capture(self.pg_interfaces)
5579 capture = self.pg1.get_capture(1)
5580 capture = capture[0]
5581 self.assertEqual(capture[IPv6].src, aftr_ip6)
5582 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5583 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5584 self.assertEqual(capture[IP].dst, '192.168.1.1')
5585 self.assertEqual(capture[ICMP].id, 4000)
5586 self.check_ip_checksum(capture)
5587 self.check_icmp_checksum(capture)
5589 # ping DS-Lite AFTR tunnel endpoint address
5590 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5591 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5592 ICMPv6EchoRequest())
5593 self.pg1.add_stream(p)
5594 self.pg_enable_capture(self.pg_interfaces)
5596 capture = self.pg1.get_capture(1)
5597 self.assertEqual(1, len(capture))
5598 capture = capture[0]
5599 self.assertEqual(capture[IPv6].src, aftr_ip6)
5600 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5601 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5604 super(TestDSlite, self).tearDown()
5605 if not self.vpp_dead:
5606 self.logger.info(self.vapi.cli("show dslite pool"))
5608 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5609 self.logger.info(self.vapi.cli("show dslite sessions"))
5611 if __name__ == '__main__':
5612 unittest.main(testRunner=VppTestRunner)