11 from framework import tag_fixme_debian11, is_distro_debian11
12 from framework import VppTestCase, VppTestRunner, VppLoInterface
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
41 # NAT HA protocol event data
45 ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46 ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47 ShortField("flags", 0),
48 IPField("in_addr", None),
49 IPField("out_addr", None),
50 ShortField("in_port", None),
51 ShortField("out_port", None),
52 IPField("eh_addr", None),
53 IPField("ehn_addr", None),
54 ShortField("eh_port", None),
55 ShortField("ehn_port", None),
56 IntField("fib_index", None),
57 IntField("total_pkts", 0),
58 LongField("total_bytes", 0),
61 def extract_padding(self, s):
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67 name = "HA NAT state sync"
69 XByteField("version", 1),
70 FlagsField("flags", 0, 8, ["ACK"]),
71 FieldLenField("count", None, count_of="events"),
72 IntField("sequence_number", 1),
73 IntField("thread_index", 0),
74 PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
78 class MethodHolder(VppTestCase):
79 """NAT create capture and verify method holder"""
82 def config_flags(self):
83 return VppEnum.vl_api_nat44_ei_config_flags_t
86 def SYSLOG_SEVERITY(self):
87 return VppEnum.vl_api_syslog_severity_t
89 def nat44_add_static_mapping(
92 external_ip="0.0.0.0",
97 external_sw_if_index=0xFFFFFFFF,
103 Add/delete NAT44EI static mapping
105 :param local_ip: Local IP address
106 :param external_ip: External IP address
107 :param local_port: Local port number (Optional)
108 :param external_port: External port number (Optional)
109 :param vrf_id: VRF ID (Default 0)
110 :param is_add: 1 if add, 0 if delete (Default add)
111 :param external_sw_if_index: External interface instead of IP address
112 :param proto: IP protocol (Mandatory if port specified)
113 :param tag: Opaque string tag
114 :param flags: NAT configuration flags
117 if not (local_port and external_port):
118 flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
120 self.vapi.nat44_ei_add_del_static_mapping(
122 local_ip_address=local_ip,
123 external_ip_address=external_ip,
124 external_sw_if_index=external_sw_if_index,
125 local_port=local_port,
126 external_port=external_port,
133 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
135 Add/delete NAT44EI address
137 :param ip: IP address
138 :param is_add: 1 if add, 0 if delete (Default add)
140 self.vapi.nat44_ei_add_del_address_range(
141 first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
144 def create_routes_and_neigbors(self):
149 [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
155 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
162 self.pg7.sw_if_index,
169 self.pg8.sw_if_index,
177 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
179 Create packet stream for inside network
181 :param in_if: Inside interface
182 :param out_if: Outside interface
183 :param dst_ip: Destination address
184 :param ttl: TTL of generated packets
187 dst_ip = out_if.remote_ip4
192 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194 / TCP(sport=self.tcp_port_in, dport=20)
200 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202 / UDP(sport=self.udp_port_in, dport=20)
208 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210 / ICMP(id=self.icmp_id_in, type="echo-request")
216 def compose_ip6(self, ip4, pref, plen):
218 Compose IPv4-embedded IPv6 addresses
220 :param ip4: IPv4 address
221 :param pref: IPv6 prefix
222 :param plen: IPv6 prefix length
223 :returns: IPv4-embedded IPv6 addresses
225 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
241 pref_n[10] = ip4_n[3]
245 pref_n[10] = ip4_n[2]
246 pref_n[11] = ip4_n[3]
249 pref_n[10] = ip4_n[1]
250 pref_n[11] = ip4_n[2]
251 pref_n[12] = ip4_n[3]
253 pref_n[12] = ip4_n[0]
254 pref_n[13] = ip4_n[1]
255 pref_n[14] = ip4_n[2]
256 pref_n[15] = ip4_n[3]
257 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
260 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
262 Create packet stream for outside network
264 :param out_if: Outside interface
265 :param dst_ip: Destination IP address (Default use global NAT address)
266 :param ttl: TTL of generated packets
267 :param use_inside_ports: Use inside NAT ports as destination ports
268 instead of outside ports
271 dst_ip = self.nat_addr
272 if not use_inside_ports:
273 tcp_port = self.tcp_port_out
274 udp_port = self.udp_port_out
275 icmp_id = self.icmp_id_out
277 tcp_port = self.tcp_port_in
278 udp_port = self.udp_port_in
279 icmp_id = self.icmp_id_in
283 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285 / TCP(dport=tcp_port, sport=20)
291 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293 / UDP(dport=udp_port, sport=20)
299 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301 / ICMP(id=icmp_id, type="echo-reply")
307 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
309 Create packet stream for outside network
311 :param out_if: Outside interface
312 :param dst_ip: Destination IP address (Default use global NAT address)
313 :param hl: HL of generated packets
318 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320 / TCP(dport=self.tcp_port_out, sport=20)
326 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328 / UDP(dport=self.udp_port_out, sport=20)
334 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336 / ICMPv6EchoReply(id=self.icmp_id_out)
342 def verify_capture_out(
352 Verify captured packets on outside network
354 :param capture: Captured packets
355 :param nat_ip: Translated IP address (Default use global NAT address)
356 :param same_port: Source port number is not translated (Default False)
357 :param dst_ip: Destination IP address (Default do not verify)
358 :param is_ip6: If L3 protocol is IPv6 (Default False)
362 ICMP46 = ICMPv6EchoRequest
367 nat_ip = self.nat_addr
368 for packet in capture:
371 self.assert_packet_checksums_valid(packet)
372 self.assertEqual(packet[IP46].src, nat_ip)
373 if dst_ip is not None:
374 self.assertEqual(packet[IP46].dst, dst_ip)
375 if packet.haslayer(TCP):
378 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
380 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381 self.tcp_port_out = packet[TCP].sport
382 self.assert_packet_checksums_valid(packet)
383 elif packet.haslayer(UDP):
386 self.assertEqual(packet[UDP].sport, self.udp_port_in)
388 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389 self.udp_port_out = packet[UDP].sport
393 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
395 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396 self.icmp_id_out = packet[ICMP46].id
397 self.assert_packet_checksums_valid(packet)
400 ppp("Unexpected or invalid packet (outside network):", packet)
404 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
406 Verify captured packets on outside network
408 :param capture: Captured packets
409 :param nat_ip: Translated IP address
410 :param same_port: Source port number is not translated (Default False)
411 :param dst_ip: Destination IP address (Default do not verify)
413 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
415 def verify_capture_in(self, capture, in_if):
417 Verify captured packets on inside network
419 :param capture: Captured packets
420 :param in_if: Inside interface
422 for packet in capture:
424 self.assert_packet_checksums_valid(packet)
425 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426 if packet.haslayer(TCP):
427 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428 elif packet.haslayer(UDP):
429 self.assertEqual(packet[UDP].dport, self.udp_port_in)
431 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
434 ppp("Unexpected or invalid packet (inside network):", packet)
438 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
440 Verify captured packet that don't have to be translated
442 :param capture: Captured packets
443 :param ingress_if: Ingress interface
444 :param egress_if: Egress interface
446 for packet in capture:
448 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452 elif packet.haslayer(UDP):
453 self.assertEqual(packet[UDP].sport, self.udp_port_in)
455 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
458 ppp("Unexpected or invalid packet (inside network):", packet)
462 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
464 Verify captured packets with ICMP errors on outside network
466 :param capture: Captured packets
467 :param src_ip: Translated IP address or IP address of VPP
468 (Default use global NAT address)
469 :param icmp_type: Type of error ICMP packet
470 we are expecting (Default 11)
473 src_ip = self.nat_addr
474 for packet in capture:
476 self.assertEqual(packet[IP].src, src_ip)
477 self.assertEqual(packet.haslayer(ICMP), 1)
479 self.assertEqual(icmp.type, icmp_type)
480 self.assertTrue(icmp.haslayer(IPerror))
481 inner_ip = icmp[IPerror]
482 if inner_ip.haslayer(TCPerror):
483 self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484 elif inner_ip.haslayer(UDPerror):
485 self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
487 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
490 ppp("Unexpected or invalid packet (outside network):", packet)
494 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
496 Verify captured packets with ICMP errors on inside network
498 :param capture: Captured packets
499 :param in_if: Inside interface
500 :param icmp_type: Type of error ICMP packet
501 we are expecting (Default 11)
503 for packet in capture:
505 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506 self.assertEqual(packet.haslayer(ICMP), 1)
508 self.assertEqual(icmp.type, icmp_type)
509 self.assertTrue(icmp.haslayer(IPerror))
510 inner_ip = icmp[IPerror]
511 if inner_ip.haslayer(TCPerror):
512 self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513 elif inner_ip.haslayer(UDPerror):
514 self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
516 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
519 ppp("Unexpected or invalid packet (inside network):", packet)
523 def create_stream_frag(
524 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
527 Create fragmented packet stream
529 :param src_if: Source interface
530 :param dst: Destination IPv4 address
531 :param sport: Source port
532 :param dport: Destination port
533 :param data: Payload data
534 :param proto: protocol (TCP, UDP, ICMP)
535 :param echo_reply: use echo_reply if protocol is ICMP
538 if proto == IP_PROTOS.tcp:
540 IP(src=src_if.remote_ip4, dst=dst)
541 / TCP(sport=sport, dport=dport)
544 p = p.__class__(scapy.compat.raw(p))
545 chksum = p[TCP].chksum
546 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547 elif proto == IP_PROTOS.udp:
548 proto_header = UDP(sport=sport, dport=dport)
549 elif proto == IP_PROTOS.icmp:
551 proto_header = ICMP(id=sport, type="echo-request")
553 proto_header = ICMP(id=sport, type="echo-reply")
555 raise Exception("Unsupported protocol")
556 id = random.randint(0, 65535)
558 if proto == IP_PROTOS.tcp:
561 raw = Raw(data[0:16])
563 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
569 if proto == IP_PROTOS.tcp:
570 raw = Raw(data[4:20])
572 raw = Raw(data[16:32])
574 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
579 if proto == IP_PROTOS.tcp:
584 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
591 def reass_frags_and_verify(self, frags, src, dst):
593 Reassemble and verify fragmented packet
595 :param frags: Captured fragments
596 :param src: Source IPv4 address to verify
597 :param dst: Destination IPv4 address to verify
599 :returns: Reassembled IPv4 packet
603 self.assertEqual(p[IP].src, src)
604 self.assertEqual(p[IP].dst, dst)
605 self.assert_ip_checksum_valid(p)
606 buffer.seek(p[IP].frag * 8)
607 buffer.write(bytes(p[IP].payload))
608 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609 if ip.proto == IP_PROTOS.tcp:
610 p = ip / TCP(buffer.getvalue())
611 self.logger.debug(ppp("Reassembled:", p))
612 self.assert_tcp_checksum_valid(p)
613 elif ip.proto == IP_PROTOS.udp:
614 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615 elif ip.proto == IP_PROTOS.icmp:
616 p = ip / ICMP(buffer.getvalue())
619 def verify_ipfix_nat44_ses(self, data):
621 Verify IPFIX NAT44EI session create/delete event
623 :param data: Decoded IPFIX data records
625 nat44_ses_create_num = 0
626 nat44_ses_delete_num = 0
627 self.assertEqual(6, len(data))
630 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631 if scapy.compat.orb(record[230]) == 4:
632 nat44_ses_create_num += 1
634 nat44_ses_delete_num += 1
636 self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637 # postNATSourceIPv4Address
639 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
642 self.assertEqual(struct.pack("!I", 0), record[234])
643 # protocolIdentifier/sourceTransportPort
644 # /postNAPTSourceTransportPort
645 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
655 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656 self.assertEqual(3, nat44_ses_create_num)
657 self.assertEqual(3, nat44_ses_delete_num)
659 def verify_ipfix_addr_exhausted(self, data):
660 self.assertEqual(1, len(data))
663 self.assertEqual(scapy.compat.orb(record[230]), 3)
665 self.assertEqual(struct.pack("!I", 0), record[283])
667 def verify_ipfix_max_sessions(self, data, limit):
668 self.assertEqual(1, len(data))
671 self.assertEqual(scapy.compat.orb(record[230]), 13)
672 # natQuotaExceededEvent
673 self.assertEqual(struct.pack("!I", 1), record[466])
675 self.assertEqual(struct.pack("!I", limit), record[471])
677 def verify_no_nat44_user(self):
678 """Verify that there is no NAT44EI user"""
679 users = self.vapi.nat44_ei_user_dump()
680 self.assertEqual(len(users), 0)
681 users = self.statistics["/nat44-ei/total-users"]
682 self.assertEqual(users[0][0], 0)
683 sessions = self.statistics["/nat44-ei/total-sessions"]
684 self.assertEqual(sessions[0][0], 0)
686 def verify_syslog_apmap(self, data, is_add=True):
687 message = data.decode("utf-8")
689 message = SyslogMessage.parse(message)
690 except ParseError as e:
694 self.assertEqual(message.severity, SyslogSeverity.info)
695 self.assertEqual(message.appname, "NAT")
696 self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
697 sd_params = message.sd.get("napmap")
698 self.assertTrue(sd_params is not None)
699 self.assertEqual(sd_params.get("IATYP"), "IPv4")
700 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
701 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
702 self.assertEqual(sd_params.get("XATYP"), "IPv4")
703 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
704 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
705 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
706 self.assertTrue(sd_params.get("SSUBIX") is not None)
707 self.assertEqual(sd_params.get("SVLAN"), "0")
709 def verify_mss_value(self, pkt, mss):
710 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
711 raise TypeError("Not a TCP/IP packet")
713 for option in pkt[TCP].options:
714 if option[0] == "MSS":
715 self.assertEqual(option[1], mss)
716 self.assert_tcp_checksum_valid(pkt)
719 def proto2layer(proto):
720 if proto == IP_PROTOS.tcp:
722 elif proto == IP_PROTOS.udp:
724 elif proto == IP_PROTOS.icmp:
727 raise Exception("Unsupported protocol")
730 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
732 layer = self.proto2layer(proto)
734 if proto == IP_PROTOS.tcp:
735 data = b"A" * 4 + b"B" * 16 + b"C" * 3
737 data = b"A" * 16 + b"B" * 16 + b"C" * 3
738 self.port_in = random.randint(1025, 65535)
741 pkts = self.create_stream_frag(
742 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
744 self.pg0.add_stream(pkts)
745 self.pg_enable_capture(self.pg_interfaces)
747 frags = self.pg1.get_capture(len(pkts))
748 if not dont_translate:
749 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
751 p = self.reass_frags_and_verify(
752 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
754 if proto != IP_PROTOS.icmp:
755 if not dont_translate:
756 self.assertEqual(p[layer].dport, 20)
758 self.assertNotEqual(p[layer].sport, self.port_in)
760 self.assertEqual(p[layer].sport, self.port_in)
763 if not dont_translate:
764 self.assertNotEqual(p[layer].id, self.port_in)
766 self.assertEqual(p[layer].id, self.port_in)
767 self.assertEqual(data, p[Raw].load)
770 if not dont_translate:
771 dst_addr = self.nat_addr
773 dst_addr = self.pg0.remote_ip4
774 if proto != IP_PROTOS.icmp:
776 dport = p[layer].sport
780 pkts = self.create_stream_frag(
781 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
783 self.pg1.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
786 frags = self.pg0.get_capture(len(pkts))
787 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
788 if proto != IP_PROTOS.icmp:
789 self.assertEqual(p[layer].sport, 20)
790 self.assertEqual(p[layer].dport, self.port_in)
792 self.assertEqual(p[layer].id, self.port_in)
793 self.assertEqual(data, p[Raw].load)
795 def reass_hairpinning(
804 layer = self.proto2layer(proto)
806 if proto == IP_PROTOS.tcp:
807 data = b"A" * 4 + b"B" * 16 + b"C" * 3
809 data = b"A" * 16 + b"B" * 16 + b"C" * 3
811 # send packet from host to server
812 pkts = self.create_stream_frag(
813 self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
815 self.pg0.add_stream(pkts)
816 self.pg_enable_capture(self.pg_interfaces)
818 frags = self.pg0.get_capture(len(pkts))
819 p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
820 if proto != IP_PROTOS.icmp:
822 self.assertNotEqual(p[layer].sport, host_in_port)
823 self.assertEqual(p[layer].dport, server_in_port)
826 self.assertNotEqual(p[layer].id, host_in_port)
827 self.assertEqual(data, p[Raw].load)
829 def frag_out_of_order(
830 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
832 layer = self.proto2layer(proto)
834 if proto == IP_PROTOS.tcp:
835 data = b"A" * 4 + b"B" * 16 + b"C" * 3
837 data = b"A" * 16 + b"B" * 16 + b"C" * 3
838 self.port_in = random.randint(1025, 65535)
842 pkts = self.create_stream_frag(
843 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
846 self.pg0.add_stream(pkts)
847 self.pg_enable_capture(self.pg_interfaces)
849 frags = self.pg1.get_capture(len(pkts))
850 if not dont_translate:
851 p = self.reass_frags_and_verify(
852 frags, self.nat_addr, self.pg1.remote_ip4
855 p = self.reass_frags_and_verify(
856 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
858 if proto != IP_PROTOS.icmp:
859 if not dont_translate:
860 self.assertEqual(p[layer].dport, 20)
862 self.assertNotEqual(p[layer].sport, self.port_in)
864 self.assertEqual(p[layer].sport, self.port_in)
867 if not dont_translate:
868 self.assertNotEqual(p[layer].id, self.port_in)
870 self.assertEqual(p[layer].id, self.port_in)
871 self.assertEqual(data, p[Raw].load)
874 if not dont_translate:
875 dst_addr = self.nat_addr
877 dst_addr = self.pg0.remote_ip4
878 if proto != IP_PROTOS.icmp:
880 dport = p[layer].sport
884 pkts = self.create_stream_frag(
885 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
888 self.pg1.add_stream(pkts)
889 self.pg_enable_capture(self.pg_interfaces)
891 frags = self.pg0.get_capture(len(pkts))
892 p = self.reass_frags_and_verify(
893 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
895 if proto != IP_PROTOS.icmp:
896 self.assertEqual(p[layer].sport, 20)
897 self.assertEqual(p[layer].dport, self.port_in)
899 self.assertEqual(p[layer].id, self.port_in)
900 self.assertEqual(data, p[Raw].load)
903 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
904 if 0 == vpp_worker_count:
906 numeric = socket.inet_aton(ip)
907 numeric = struct.unpack("!L", numeric)[0]
908 numeric = socket.htonl(numeric)
909 h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
910 return 1 + h % vpp_worker_count
914 class TestNAT44EI(MethodHolder):
915 """NAT44EI Test Cases"""
917 max_translations = 10240
922 super(TestNAT44EI, cls).setUpClass()
923 if is_distro_debian11 == True and not hasattr(cls, "vpp"):
925 cls.vapi.cli("set log class nat44-ei level debug")
927 cls.tcp_port_in = 6303
928 cls.tcp_port_out = 6303
929 cls.udp_port_in = 6304
930 cls.udp_port_out = 6304
931 cls.icmp_id_in = 6305
932 cls.icmp_id_out = 6305
933 cls.nat_addr = "10.0.0.3"
934 cls.ipfix_src_port = 4739
935 cls.ipfix_domain_id = 1
936 cls.tcp_external_port = 80
937 cls.udp_external_port = 69
939 cls.create_pg_interfaces(range(10))
940 cls.interfaces = list(cls.pg_interfaces[0:4])
942 for i in cls.interfaces:
947 cls.pg0.generate_remote_hosts(3)
948 cls.pg0.configure_ipv4_neighbors()
950 cls.pg1.generate_remote_hosts(1)
951 cls.pg1.configure_ipv4_neighbors()
953 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
954 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
955 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
957 cls.pg4._local_ip4 = "172.16.255.1"
958 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
959 cls.pg4.set_table_ip4(10)
960 cls.pg5._local_ip4 = "172.17.255.3"
961 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
962 cls.pg5.set_table_ip4(10)
963 cls.pg6._local_ip4 = "172.16.255.1"
964 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
965 cls.pg6.set_table_ip4(20)
966 for i in cls.overlapping_interfaces:
974 cls.pg9.generate_remote_hosts(2)
976 cls.vapi.sw_interface_add_del_address(
977 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
981 cls.pg9.resolve_arp()
982 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
983 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
984 cls.pg9.resolve_arp()
986 def plugin_enable(self):
987 self.vapi.nat44_ei_plugin_enable_disable(
988 sessions=self.max_translations, users=self.max_users, enable=1
992 super(TestNAT44EI, self).setUp()
996 super(TestNAT44EI, self).tearDown()
997 if not self.vpp_dead:
998 self.vapi.nat44_ei_ipfix_enable_disable(
999 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1001 self.ipfix_src_port = 4739
1002 self.ipfix_domain_id = 1
1004 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1005 self.vapi.cli("clear logging")
1007 def test_clear_sessions(self):
1008 """NAT44EI session clearing test"""
1010 self.nat44_add_address(self.nat_addr)
1011 flags = self.config_flags.NAT44_EI_IF_INSIDE
1012 self.vapi.nat44_ei_interface_add_del_feature(
1013 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1015 self.vapi.nat44_ei_interface_add_del_feature(
1016 sw_if_index=self.pg1.sw_if_index, is_add=1
1019 pkts = self.create_stream_in(self.pg0, self.pg1)
1020 self.pg0.add_stream(pkts)
1021 self.pg_enable_capture(self.pg_interfaces)
1023 capture = self.pg1.get_capture(len(pkts))
1024 self.verify_capture_out(capture)
1026 sessions = self.statistics["/nat44-ei/total-sessions"]
1027 self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1028 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1030 self.vapi.cli("clear nat44 ei sessions")
1032 sessions = self.statistics["/nat44-ei/total-sessions"]
1033 self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1034 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1036 def test_dynamic(self):
1037 """NAT44EI dynamic translation test"""
1038 self.nat44_add_address(self.nat_addr)
1039 flags = self.config_flags.NAT44_EI_IF_INSIDE
1040 self.vapi.nat44_ei_interface_add_del_feature(
1041 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1043 self.vapi.nat44_ei_interface_add_del_feature(
1044 sw_if_index=self.pg1.sw_if_index, is_add=1
1048 tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1049 udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1050 icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1051 drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1053 pkts = self.create_stream_in(self.pg0, self.pg1)
1054 self.pg0.add_stream(pkts)
1055 self.pg_enable_capture(self.pg_interfaces)
1057 capture = self.pg1.get_capture(len(pkts))
1058 self.verify_capture_out(capture)
1060 if_idx = self.pg0.sw_if_index
1061 cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1062 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1063 cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1064 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1065 cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1066 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1067 cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1068 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1071 tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1072 udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1073 icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1074 drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1076 pkts = self.create_stream_out(self.pg1)
1077 self.pg1.add_stream(pkts)
1078 self.pg_enable_capture(self.pg_interfaces)
1080 capture = self.pg0.get_capture(len(pkts))
1081 self.verify_capture_in(capture, self.pg0)
1083 if_idx = self.pg1.sw_if_index
1084 cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1085 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1086 cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1087 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1088 cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1089 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1090 cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1091 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1093 users = self.statistics["/nat44-ei/total-users"]
1094 self.assertEqual(users[:, 0].sum(), 1)
1095 sessions = self.statistics["/nat44-ei/total-sessions"]
1096 self.assertEqual(sessions[:, 0].sum(), 3)
1098 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1099 """NAT44EI handling of client packets with TTL=1"""
1101 self.nat44_add_address(self.nat_addr)
1102 flags = self.config_flags.NAT44_EI_IF_INSIDE
1103 self.vapi.nat44_ei_interface_add_del_feature(
1104 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1106 self.vapi.nat44_ei_interface_add_del_feature(
1107 sw_if_index=self.pg1.sw_if_index, is_add=1
1110 # Client side - generate traffic
1111 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1112 capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1114 # Client side - verify ICMP type 11 packets
1115 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1117 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1118 """NAT44EI handling of server packets with TTL=1"""
1120 self.nat44_add_address(self.nat_addr)
1121 flags = self.config_flags.NAT44_EI_IF_INSIDE
1122 self.vapi.nat44_ei_interface_add_del_feature(
1123 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1125 self.vapi.nat44_ei_interface_add_del_feature(
1126 sw_if_index=self.pg1.sw_if_index, is_add=1
1129 # Client side - create sessions
1130 pkts = self.create_stream_in(self.pg0, self.pg1)
1131 self.pg0.add_stream(pkts)
1132 self.pg_enable_capture(self.pg_interfaces)
1135 # Server side - generate traffic
1136 capture = self.pg1.get_capture(len(pkts))
1137 self.verify_capture_out(capture)
1138 pkts = self.create_stream_out(self.pg1, ttl=1)
1139 capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1141 # Server side - verify ICMP type 11 packets
1142 self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1144 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1145 """NAT44EI handling of error responses to client packets with TTL=2"""
1147 self.nat44_add_address(self.nat_addr)
1148 flags = self.config_flags.NAT44_EI_IF_INSIDE
1149 self.vapi.nat44_ei_interface_add_del_feature(
1150 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1152 self.vapi.nat44_ei_interface_add_del_feature(
1153 sw_if_index=self.pg1.sw_if_index, is_add=1
1156 # Client side - generate traffic
1157 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1158 self.pg0.add_stream(pkts)
1159 self.pg_enable_capture(self.pg_interfaces)
1162 # Server side - simulate ICMP type 11 response
1163 capture = self.pg1.get_capture(len(pkts))
1165 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1166 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1169 for packet in capture
1171 self.pg1.add_stream(pkts)
1172 self.pg_enable_capture(self.pg_interfaces)
1175 # Client side - verify ICMP type 11 packets
1176 capture = self.pg0.get_capture(len(pkts))
1177 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1179 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1180 """NAT44EI handling of error responses to server packets with TTL=2"""
1182 self.nat44_add_address(self.nat_addr)
1183 flags = self.config_flags.NAT44_EI_IF_INSIDE
1184 self.vapi.nat44_ei_interface_add_del_feature(
1185 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1187 self.vapi.nat44_ei_interface_add_del_feature(
1188 sw_if_index=self.pg1.sw_if_index, is_add=1
1191 # Client side - create sessions
1192 pkts = self.create_stream_in(self.pg0, self.pg1)
1193 self.pg0.add_stream(pkts)
1194 self.pg_enable_capture(self.pg_interfaces)
1197 # Server side - generate traffic
1198 capture = self.pg1.get_capture(len(pkts))
1199 self.verify_capture_out(capture)
1200 pkts = self.create_stream_out(self.pg1, ttl=2)
1201 self.pg1.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Client side - simulate ICMP type 11 response
1206 capture = self.pg0.get_capture(len(pkts))
1208 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1209 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1212 for packet in capture
1214 self.pg0.add_stream(pkts)
1215 self.pg_enable_capture(self.pg_interfaces)
1218 # Server side - verify ICMP type 11 packets
1219 capture = self.pg1.get_capture(len(pkts))
1220 self.verify_capture_out_with_icmp_errors(capture)
1222 def test_ping_out_interface_from_outside(self):
1223 """NAT44EI ping out interface from outside network"""
1225 self.nat44_add_address(self.nat_addr)
1226 flags = self.config_flags.NAT44_EI_IF_INSIDE
1227 self.vapi.nat44_ei_interface_add_del_feature(
1228 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1230 self.vapi.nat44_ei_interface_add_del_feature(
1231 sw_if_index=self.pg1.sw_if_index, is_add=1
1235 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1236 / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1237 / ICMP(id=self.icmp_id_out, type="echo-request")
1240 self.pg1.add_stream(pkts)
1241 self.pg_enable_capture(self.pg_interfaces)
1243 capture = self.pg1.get_capture(len(pkts))
1246 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1247 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1248 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1249 self.assertEqual(packet[ICMP].type, 0) # echo reply
1252 ppp("Unexpected or invalid packet (outside network):", packet)
1256 def test_ping_internal_host_from_outside(self):
1257 """NAT44EI ping internal host from outside network"""
1259 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1260 flags = self.config_flags.NAT44_EI_IF_INSIDE
1261 self.vapi.nat44_ei_interface_add_del_feature(
1262 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1264 self.vapi.nat44_ei_interface_add_del_feature(
1265 sw_if_index=self.pg1.sw_if_index, is_add=1
1270 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1271 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1272 / ICMP(id=self.icmp_id_out, type="echo-request")
1274 self.pg1.add_stream(pkt)
1275 self.pg_enable_capture(self.pg_interfaces)
1277 capture = self.pg0.get_capture(1)
1278 self.verify_capture_in(capture, self.pg0)
1279 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1283 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1284 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1285 / ICMP(id=self.icmp_id_in, type="echo-reply")
1287 self.pg0.add_stream(pkt)
1288 self.pg_enable_capture(self.pg_interfaces)
1290 capture = self.pg1.get_capture(1)
1291 self.verify_capture_out(capture, same_port=True)
1292 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1294 def test_forwarding(self):
1295 """NAT44EI forwarding test"""
1297 flags = self.config_flags.NAT44_EI_IF_INSIDE
1298 self.vapi.nat44_ei_interface_add_del_feature(
1299 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1301 self.vapi.nat44_ei_interface_add_del_feature(
1302 sw_if_index=self.pg1.sw_if_index, is_add=1
1304 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1306 real_ip = self.pg0.remote_ip4
1307 alias_ip = self.nat_addr
1308 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1309 self.vapi.nat44_ei_add_del_static_mapping(
1311 local_ip_address=real_ip,
1312 external_ip_address=alias_ip,
1313 external_sw_if_index=0xFFFFFFFF,
1318 # static mapping match
1320 pkts = self.create_stream_out(self.pg1)
1321 self.pg1.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg0.get_capture(len(pkts))
1325 self.verify_capture_in(capture, self.pg0)
1327 pkts = self.create_stream_in(self.pg0, self.pg1)
1328 self.pg0.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg1.get_capture(len(pkts))
1332 self.verify_capture_out(capture, same_port=True)
1334 # no static mapping match
1336 host0 = self.pg0.remote_hosts[0]
1337 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1339 pkts = self.create_stream_out(
1340 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1342 self.pg1.add_stream(pkts)
1343 self.pg_enable_capture(self.pg_interfaces)
1345 capture = self.pg0.get_capture(len(pkts))
1346 self.verify_capture_in(capture, self.pg0)
1348 pkts = self.create_stream_in(self.pg0, self.pg1)
1349 self.pg0.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg1.get_capture(len(pkts))
1353 self.verify_capture_out(
1354 capture, nat_ip=self.pg0.remote_ip4, same_port=True
1357 self.pg0.remote_hosts[0] = host0
1360 self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1361 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1362 self.vapi.nat44_ei_add_del_static_mapping(
1364 local_ip_address=real_ip,
1365 external_ip_address=alias_ip,
1366 external_sw_if_index=0xFFFFFFFF,
1370 def test_static_in(self):
1371 """NAT44EI 1:1 NAT initialized from inside network"""
1373 nat_ip = "10.0.0.10"
1374 self.tcp_port_out = 6303
1375 self.udp_port_out = 6304
1376 self.icmp_id_out = 6305
1378 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1379 flags = self.config_flags.NAT44_EI_IF_INSIDE
1380 self.vapi.nat44_ei_interface_add_del_feature(
1381 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1383 self.vapi.nat44_ei_interface_add_del_feature(
1384 sw_if_index=self.pg1.sw_if_index, is_add=1
1386 sm = self.vapi.nat44_ei_static_mapping_dump()
1387 self.assertEqual(len(sm), 1)
1388 self.assertEqual(sm[0].tag, "")
1389 self.assertEqual(sm[0].protocol, 0)
1390 self.assertEqual(sm[0].local_port, 0)
1391 self.assertEqual(sm[0].external_port, 0)
1394 pkts = self.create_stream_in(self.pg0, self.pg1)
1395 self.pg0.add_stream(pkts)
1396 self.pg_enable_capture(self.pg_interfaces)
1398 capture = self.pg1.get_capture(len(pkts))
1399 self.verify_capture_out(capture, nat_ip, True)
1402 pkts = self.create_stream_out(self.pg1, nat_ip)
1403 self.pg1.add_stream(pkts)
1404 self.pg_enable_capture(self.pg_interfaces)
1406 capture = self.pg0.get_capture(len(pkts))
1407 self.verify_capture_in(capture, self.pg0)
1409 def test_static_out(self):
1410 """NAT44EI 1:1 NAT initialized from outside network"""
1412 nat_ip = "10.0.0.20"
1413 self.tcp_port_out = 6303
1414 self.udp_port_out = 6304
1415 self.icmp_id_out = 6305
1418 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1419 flags = self.config_flags.NAT44_EI_IF_INSIDE
1420 self.vapi.nat44_ei_interface_add_del_feature(
1421 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1423 self.vapi.nat44_ei_interface_add_del_feature(
1424 sw_if_index=self.pg1.sw_if_index, is_add=1
1426 sm = self.vapi.nat44_ei_static_mapping_dump()
1427 self.assertEqual(len(sm), 1)
1428 self.assertEqual(sm[0].tag, tag)
1431 pkts = self.create_stream_out(self.pg1, nat_ip)
1432 self.pg1.add_stream(pkts)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 capture = self.pg0.get_capture(len(pkts))
1436 self.verify_capture_in(capture, self.pg0)
1439 pkts = self.create_stream_in(self.pg0, self.pg1)
1440 self.pg0.add_stream(pkts)
1441 self.pg_enable_capture(self.pg_interfaces)
1443 capture = self.pg1.get_capture(len(pkts))
1444 self.verify_capture_out(capture, nat_ip, True)
1446 def test_static_with_port_in(self):
1447 """NAT44EI 1:1 NAPT initialized from inside network"""
1449 self.tcp_port_out = 3606
1450 self.udp_port_out = 3607
1451 self.icmp_id_out = 3608
1453 self.nat44_add_address(self.nat_addr)
1454 self.nat44_add_static_mapping(
1455 self.pg0.remote_ip4,
1459 proto=IP_PROTOS.tcp,
1461 self.nat44_add_static_mapping(
1462 self.pg0.remote_ip4,
1466 proto=IP_PROTOS.udp,
1468 self.nat44_add_static_mapping(
1469 self.pg0.remote_ip4,
1473 proto=IP_PROTOS.icmp,
1475 flags = self.config_flags.NAT44_EI_IF_INSIDE
1476 self.vapi.nat44_ei_interface_add_del_feature(
1477 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1479 self.vapi.nat44_ei_interface_add_del_feature(
1480 sw_if_index=self.pg1.sw_if_index, is_add=1
1484 pkts = self.create_stream_in(self.pg0, self.pg1)
1485 self.pg0.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg1.get_capture(len(pkts))
1489 self.verify_capture_out(capture)
1492 pkts = self.create_stream_out(self.pg1)
1493 self.pg1.add_stream(pkts)
1494 self.pg_enable_capture(self.pg_interfaces)
1496 capture = self.pg0.get_capture(len(pkts))
1497 self.verify_capture_in(capture, self.pg0)
1499 def test_static_with_port_out(self):
1500 """NAT44EI 1:1 NAPT initialized from outside network"""
1502 self.tcp_port_out = 30606
1503 self.udp_port_out = 30607
1504 self.icmp_id_out = 30608
1506 self.nat44_add_address(self.nat_addr)
1507 self.nat44_add_static_mapping(
1508 self.pg0.remote_ip4,
1512 proto=IP_PROTOS.tcp,
1514 self.nat44_add_static_mapping(
1515 self.pg0.remote_ip4,
1519 proto=IP_PROTOS.udp,
1521 self.nat44_add_static_mapping(
1522 self.pg0.remote_ip4,
1526 proto=IP_PROTOS.icmp,
1528 flags = self.config_flags.NAT44_EI_IF_INSIDE
1529 self.vapi.nat44_ei_interface_add_del_feature(
1530 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1532 self.vapi.nat44_ei_interface_add_del_feature(
1533 sw_if_index=self.pg1.sw_if_index, is_add=1
1537 pkts = self.create_stream_out(self.pg1)
1538 self.pg1.add_stream(pkts)
1539 self.pg_enable_capture(self.pg_interfaces)
1541 capture = self.pg0.get_capture(len(pkts))
1542 self.verify_capture_in(capture, self.pg0)
1545 pkts = self.create_stream_in(self.pg0, self.pg1)
1546 self.pg0.add_stream(pkts)
1547 self.pg_enable_capture(self.pg_interfaces)
1549 capture = self.pg1.get_capture(len(pkts))
1550 self.verify_capture_out(capture)
1552 def test_static_vrf_aware(self):
1553 """NAT44EI 1:1 NAT VRF awareness"""
1555 nat_ip1 = "10.0.0.30"
1556 nat_ip2 = "10.0.0.40"
1557 self.tcp_port_out = 6303
1558 self.udp_port_out = 6304
1559 self.icmp_id_out = 6305
1561 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1562 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1563 flags = self.config_flags.NAT44_EI_IF_INSIDE
1564 self.vapi.nat44_ei_interface_add_del_feature(
1565 sw_if_index=self.pg3.sw_if_index, is_add=1
1567 self.vapi.nat44_ei_interface_add_del_feature(
1568 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1570 self.vapi.nat44_ei_interface_add_del_feature(
1571 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1574 # inside interface VRF match NAT44EI static mapping VRF
1575 pkts = self.create_stream_in(self.pg4, self.pg3)
1576 self.pg4.add_stream(pkts)
1577 self.pg_enable_capture(self.pg_interfaces)
1579 capture = self.pg3.get_capture(len(pkts))
1580 self.verify_capture_out(capture, nat_ip1, True)
1582 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1584 pkts = self.create_stream_in(self.pg0, self.pg3)
1585 self.pg0.add_stream(pkts)
1586 self.pg_enable_capture(self.pg_interfaces)
1588 self.pg3.assert_nothing_captured()
1590 def test_dynamic_to_static(self):
1591 """NAT44EI Switch from dynamic translation to 1:1NAT"""
1592 nat_ip = "10.0.0.10"
1593 self.tcp_port_out = 6303
1594 self.udp_port_out = 6304
1595 self.icmp_id_out = 6305
1597 self.nat44_add_address(self.nat_addr)
1598 flags = self.config_flags.NAT44_EI_IF_INSIDE
1599 self.vapi.nat44_ei_interface_add_del_feature(
1600 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1602 self.vapi.nat44_ei_interface_add_del_feature(
1603 sw_if_index=self.pg1.sw_if_index, is_add=1
1607 pkts = self.create_stream_in(self.pg0, self.pg1)
1608 self.pg0.add_stream(pkts)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg1.get_capture(len(pkts))
1612 self.verify_capture_out(capture)
1615 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1616 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1617 self.assertEqual(len(sessions), 0)
1618 pkts = self.create_stream_in(self.pg0, self.pg1)
1619 self.pg0.add_stream(pkts)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg1.get_capture(len(pkts))
1623 self.verify_capture_out(capture, nat_ip, True)
1625 def test_identity_nat(self):
1626 """NAT44EI Identity NAT"""
1627 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1628 self.vapi.nat44_ei_add_del_identity_mapping(
1629 ip_address=self.pg0.remote_ip4,
1630 sw_if_index=0xFFFFFFFF,
1634 flags = self.config_flags.NAT44_EI_IF_INSIDE
1635 self.vapi.nat44_ei_interface_add_del_feature(
1636 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1638 self.vapi.nat44_ei_interface_add_del_feature(
1639 sw_if_index=self.pg1.sw_if_index, is_add=1
1643 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1644 / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1645 / TCP(sport=12345, dport=56789)
1647 self.pg1.add_stream(p)
1648 self.pg_enable_capture(self.pg_interfaces)
1650 capture = self.pg0.get_capture(1)
1655 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1656 self.assertEqual(ip.src, self.pg1.remote_ip4)
1657 self.assertEqual(tcp.dport, 56789)
1658 self.assertEqual(tcp.sport, 12345)
1659 self.assert_packet_checksums_valid(p)
1661 self.logger.error(ppp("Unexpected or invalid packet:", p))
1664 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1665 self.assertEqual(len(sessions), 0)
1666 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1667 self.vapi.nat44_ei_add_del_identity_mapping(
1668 ip_address=self.pg0.remote_ip4,
1669 sw_if_index=0xFFFFFFFF,
1674 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1675 self.assertEqual(len(identity_mappings), 2)
1677 def test_multiple_inside_interfaces(self):
1678 """NAT44EI multiple non-overlapping address space inside interfaces"""
1680 self.nat44_add_address(self.nat_addr)
1681 flags = self.config_flags.NAT44_EI_IF_INSIDE
1682 self.vapi.nat44_ei_interface_add_del_feature(
1683 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1685 self.vapi.nat44_ei_interface_add_del_feature(
1686 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1688 self.vapi.nat44_ei_interface_add_del_feature(
1689 sw_if_index=self.pg3.sw_if_index, is_add=1
1692 # between two NAT44EI inside interfaces (no translation)
1693 pkts = self.create_stream_in(self.pg0, self.pg1)
1694 self.pg0.add_stream(pkts)
1695 self.pg_enable_capture(self.pg_interfaces)
1697 capture = self.pg1.get_capture(len(pkts))
1698 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1700 # from inside to interface without translation
1701 pkts = self.create_stream_in(self.pg0, self.pg2)
1702 self.pg0.add_stream(pkts)
1703 self.pg_enable_capture(self.pg_interfaces)
1705 capture = self.pg2.get_capture(len(pkts))
1706 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1708 # in2out 1st interface
1709 pkts = self.create_stream_in(self.pg0, self.pg3)
1710 self.pg0.add_stream(pkts)
1711 self.pg_enable_capture(self.pg_interfaces)
1713 capture = self.pg3.get_capture(len(pkts))
1714 self.verify_capture_out(capture)
1716 # out2in 1st interface
1717 pkts = self.create_stream_out(self.pg3)
1718 self.pg3.add_stream(pkts)
1719 self.pg_enable_capture(self.pg_interfaces)
1721 capture = self.pg0.get_capture(len(pkts))
1722 self.verify_capture_in(capture, self.pg0)
1724 # in2out 2nd interface
1725 pkts = self.create_stream_in(self.pg1, self.pg3)
1726 self.pg1.add_stream(pkts)
1727 self.pg_enable_capture(self.pg_interfaces)
1729 capture = self.pg3.get_capture(len(pkts))
1730 self.verify_capture_out(capture)
1732 # out2in 2nd interface
1733 pkts = self.create_stream_out(self.pg3)
1734 self.pg3.add_stream(pkts)
1735 self.pg_enable_capture(self.pg_interfaces)
1737 capture = self.pg1.get_capture(len(pkts))
1738 self.verify_capture_in(capture, self.pg1)
1740 def test_inside_overlapping_interfaces(self):
1741 """NAT44EI multiple inside interfaces with overlapping address space"""
1743 static_nat_ip = "10.0.0.10"
1744 self.nat44_add_address(self.nat_addr)
1745 flags = self.config_flags.NAT44_EI_IF_INSIDE
1746 self.vapi.nat44_ei_interface_add_del_feature(
1747 sw_if_index=self.pg3.sw_if_index, is_add=1
1749 self.vapi.nat44_ei_interface_add_del_feature(
1750 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1752 self.vapi.nat44_ei_interface_add_del_feature(
1753 sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1755 self.vapi.nat44_ei_interface_add_del_feature(
1756 sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1758 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1760 # between NAT44EI inside interfaces with same VRF (no translation)
1761 pkts = self.create_stream_in(self.pg4, self.pg5)
1762 self.pg4.add_stream(pkts)
1763 self.pg_enable_capture(self.pg_interfaces)
1765 capture = self.pg5.get_capture(len(pkts))
1766 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1768 # between NAT44EI inside interfaces with different VRF (hairpinning)
1770 Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1771 / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1772 / TCP(sport=1234, dport=5678)
1774 self.pg4.add_stream(p)
1775 self.pg_enable_capture(self.pg_interfaces)
1777 capture = self.pg6.get_capture(1)
1782 self.assertEqual(ip.src, self.nat_addr)
1783 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1784 self.assertNotEqual(tcp.sport, 1234)
1785 self.assertEqual(tcp.dport, 5678)
1787 self.logger.error(ppp("Unexpected or invalid packet:", p))
1790 # in2out 1st interface
1791 pkts = self.create_stream_in(self.pg4, self.pg3)
1792 self.pg4.add_stream(pkts)
1793 self.pg_enable_capture(self.pg_interfaces)
1795 capture = self.pg3.get_capture(len(pkts))
1796 self.verify_capture_out(capture)
1798 # out2in 1st interface
1799 pkts = self.create_stream_out(self.pg3)
1800 self.pg3.add_stream(pkts)
1801 self.pg_enable_capture(self.pg_interfaces)
1803 capture = self.pg4.get_capture(len(pkts))
1804 self.verify_capture_in(capture, self.pg4)
1806 # in2out 2nd interface
1807 pkts = self.create_stream_in(self.pg5, self.pg3)
1808 self.pg5.add_stream(pkts)
1809 self.pg_enable_capture(self.pg_interfaces)
1811 capture = self.pg3.get_capture(len(pkts))
1812 self.verify_capture_out(capture)
1814 # out2in 2nd interface
1815 pkts = self.create_stream_out(self.pg3)
1816 self.pg3.add_stream(pkts)
1817 self.pg_enable_capture(self.pg_interfaces)
1819 capture = self.pg5.get_capture(len(pkts))
1820 self.verify_capture_in(capture, self.pg5)
1823 addresses = self.vapi.nat44_ei_address_dump()
1824 self.assertEqual(len(addresses), 1)
1825 sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1826 self.assertEqual(len(sessions), 3)
1827 for session in sessions:
1828 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1829 self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1830 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1831 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1832 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1833 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1834 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1835 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1836 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1837 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1838 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1839 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1841 # in2out 3rd interface
1842 pkts = self.create_stream_in(self.pg6, self.pg3)
1843 self.pg6.add_stream(pkts)
1844 self.pg_enable_capture(self.pg_interfaces)
1846 capture = self.pg3.get_capture(len(pkts))
1847 self.verify_capture_out(capture, static_nat_ip, True)
1849 # out2in 3rd interface
1850 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1851 self.pg3.add_stream(pkts)
1852 self.pg_enable_capture(self.pg_interfaces)
1854 capture = self.pg6.get_capture(len(pkts))
1855 self.verify_capture_in(capture, self.pg6)
1857 # general user and session dump verifications
1858 users = self.vapi.nat44_ei_user_dump()
1859 self.assertGreaterEqual(len(users), 3)
1860 addresses = self.vapi.nat44_ei_address_dump()
1861 self.assertEqual(len(addresses), 1)
1863 sessions = self.vapi.nat44_ei_user_session_dump(
1864 user.ip_address, user.vrf_id
1866 for session in sessions:
1867 self.assertEqual(user.ip_address, session.inside_ip_address)
1868 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1870 session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1874 sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1875 self.assertGreaterEqual(len(sessions), 4)
1876 for session in sessions:
1877 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1878 self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1879 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1882 sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1883 self.assertGreaterEqual(len(sessions), 3)
1884 for session in sessions:
1885 self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1886 self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1887 self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1890 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1893 def test_hairpinning(self):
1894 """NAT44EI hairpinning - 1:1 NAPT"""
1896 host = self.pg0.remote_hosts[0]
1897 server = self.pg0.remote_hosts[1]
1900 server_in_port = 5678
1901 server_out_port = 8765
1903 self.nat44_add_address(self.nat_addr)
1904 flags = self.config_flags.NAT44_EI_IF_INSIDE
1905 self.vapi.nat44_ei_interface_add_del_feature(
1906 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1908 self.vapi.nat44_ei_interface_add_del_feature(
1909 sw_if_index=self.pg1.sw_if_index, is_add=1
1912 # add static mapping for server
1913 self.nat44_add_static_mapping(
1918 proto=IP_PROTOS.tcp,
1921 cnt = self.statistics["/nat44-ei/hairpinning"]
1922 # send packet from host to server
1924 Ether(src=host.mac, dst=self.pg0.local_mac)
1925 / IP(src=host.ip4, dst=self.nat_addr)
1926 / TCP(sport=host_in_port, dport=server_out_port)
1928 self.pg0.add_stream(p)
1929 self.pg_enable_capture(self.pg_interfaces)
1931 capture = self.pg0.get_capture(1)
1936 self.assertEqual(ip.src, self.nat_addr)
1937 self.assertEqual(ip.dst, server.ip4)
1938 self.assertNotEqual(tcp.sport, host_in_port)
1939 self.assertEqual(tcp.dport, server_in_port)
1940 self.assert_packet_checksums_valid(p)
1941 host_out_port = tcp.sport
1943 self.logger.error(ppp("Unexpected or invalid packet:", p))
1946 after = self.statistics["/nat44-ei/hairpinning"]
1947 if_idx = self.pg0.sw_if_index
1948 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1950 # send reply from server to host
1952 Ether(src=server.mac, dst=self.pg0.local_mac)
1953 / IP(src=server.ip4, dst=self.nat_addr)
1954 / TCP(sport=server_in_port, dport=host_out_port)
1956 self.pg0.add_stream(p)
1957 self.pg_enable_capture(self.pg_interfaces)
1959 capture = self.pg0.get_capture(1)
1964 self.assertEqual(ip.src, self.nat_addr)
1965 self.assertEqual(ip.dst, host.ip4)
1966 self.assertEqual(tcp.sport, server_out_port)
1967 self.assertEqual(tcp.dport, host_in_port)
1968 self.assert_packet_checksums_valid(p)
1970 self.logger.error(ppp("Unexpected or invalid packet:", p))
1973 after = self.statistics["/nat44-ei/hairpinning"]
1974 if_idx = self.pg0.sw_if_index
1976 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1977 2 + (1 if self.vpp_worker_count > 0 else 0),
1980 def test_hairpinning2(self):
1981 """NAT44EI hairpinning - 1:1 NAT"""
1983 server1_nat_ip = "10.0.0.10"
1984 server2_nat_ip = "10.0.0.11"
1985 host = self.pg0.remote_hosts[0]
1986 server1 = self.pg0.remote_hosts[1]
1987 server2 = self.pg0.remote_hosts[2]
1988 server_tcp_port = 22
1989 server_udp_port = 20
1991 self.nat44_add_address(self.nat_addr)
1992 flags = self.config_flags.NAT44_EI_IF_INSIDE
1993 self.vapi.nat44_ei_interface_add_del_feature(
1994 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1996 self.vapi.nat44_ei_interface_add_del_feature(
1997 sw_if_index=self.pg1.sw_if_index, is_add=1
2000 # add static mapping for servers
2001 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2002 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2007 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2008 / IP(src=host.ip4, dst=server1_nat_ip)
2009 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2013 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2014 / IP(src=host.ip4, dst=server1_nat_ip)
2015 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2019 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2020 / IP(src=host.ip4, dst=server1_nat_ip)
2021 / ICMP(id=self.icmp_id_in, type="echo-request")
2024 self.pg0.add_stream(pkts)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg0.get_capture(len(pkts))
2028 for packet in capture:
2030 self.assertEqual(packet[IP].src, self.nat_addr)
2031 self.assertEqual(packet[IP].dst, server1.ip4)
2032 if packet.haslayer(TCP):
2033 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2034 self.assertEqual(packet[TCP].dport, server_tcp_port)
2035 self.tcp_port_out = packet[TCP].sport
2036 self.assert_packet_checksums_valid(packet)
2037 elif packet.haslayer(UDP):
2038 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2039 self.assertEqual(packet[UDP].dport, server_udp_port)
2040 self.udp_port_out = packet[UDP].sport
2042 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2043 self.icmp_id_out = packet[ICMP].id
2045 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2051 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2052 / IP(src=server1.ip4, dst=self.nat_addr)
2053 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2057 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2058 / IP(src=server1.ip4, dst=self.nat_addr)
2059 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2063 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2064 / IP(src=server1.ip4, dst=self.nat_addr)
2065 / ICMP(id=self.icmp_id_out, type="echo-reply")
2068 self.pg0.add_stream(pkts)
2069 self.pg_enable_capture(self.pg_interfaces)
2071 capture = self.pg0.get_capture(len(pkts))
2072 for packet in capture:
2074 self.assertEqual(packet[IP].src, server1_nat_ip)
2075 self.assertEqual(packet[IP].dst, host.ip4)
2076 if packet.haslayer(TCP):
2077 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2078 self.assertEqual(packet[TCP].sport, server_tcp_port)
2079 self.assert_packet_checksums_valid(packet)
2080 elif packet.haslayer(UDP):
2081 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2082 self.assertEqual(packet[UDP].sport, server_udp_port)
2084 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2086 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2089 # server2 to server1
2092 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2093 / IP(src=server2.ip4, dst=server1_nat_ip)
2094 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2098 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2099 / IP(src=server2.ip4, dst=server1_nat_ip)
2100 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2104 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2105 / IP(src=server2.ip4, dst=server1_nat_ip)
2106 / ICMP(id=self.icmp_id_in, type="echo-request")
2109 self.pg0.add_stream(pkts)
2110 self.pg_enable_capture(self.pg_interfaces)
2112 capture = self.pg0.get_capture(len(pkts))
2113 for packet in capture:
2115 self.assertEqual(packet[IP].src, server2_nat_ip)
2116 self.assertEqual(packet[IP].dst, server1.ip4)
2117 if packet.haslayer(TCP):
2118 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2119 self.assertEqual(packet[TCP].dport, server_tcp_port)
2120 self.tcp_port_out = packet[TCP].sport
2121 self.assert_packet_checksums_valid(packet)
2122 elif packet.haslayer(UDP):
2123 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2124 self.assertEqual(packet[UDP].dport, server_udp_port)
2125 self.udp_port_out = packet[UDP].sport
2127 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2128 self.icmp_id_out = packet[ICMP].id
2130 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2133 # server1 to server2
2136 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2137 / IP(src=server1.ip4, dst=server2_nat_ip)
2138 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2142 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2143 / IP(src=server1.ip4, dst=server2_nat_ip)
2144 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2148 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2149 / IP(src=server1.ip4, dst=server2_nat_ip)
2150 / ICMP(id=self.icmp_id_out, type="echo-reply")
2153 self.pg0.add_stream(pkts)
2154 self.pg_enable_capture(self.pg_interfaces)
2156 capture = self.pg0.get_capture(len(pkts))
2157 for packet in capture:
2159 self.assertEqual(packet[IP].src, server1_nat_ip)
2160 self.assertEqual(packet[IP].dst, server2.ip4)
2161 if packet.haslayer(TCP):
2162 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2163 self.assertEqual(packet[TCP].sport, server_tcp_port)
2164 self.assert_packet_checksums_valid(packet)
2165 elif packet.haslayer(UDP):
2166 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2167 self.assertEqual(packet[UDP].sport, server_udp_port)
2169 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2171 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2174 def test_hairpinning_avoid_inf_loop(self):
2175 """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2177 host = self.pg0.remote_hosts[0]
2178 server = self.pg0.remote_hosts[1]
2181 server_in_port = 5678
2182 server_out_port = 8765
2184 self.nat44_add_address(self.nat_addr)
2185 flags = self.config_flags.NAT44_EI_IF_INSIDE
2186 self.vapi.nat44_ei_interface_add_del_feature(
2187 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2189 self.vapi.nat44_ei_interface_add_del_feature(
2190 sw_if_index=self.pg1.sw_if_index, is_add=1
2193 # add static mapping for server
2194 self.nat44_add_static_mapping(
2199 proto=IP_PROTOS.tcp,
2202 # add another static mapping that maps pg0.local_ip4 address to itself
2203 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2205 # send packet from host to VPP (the packet should get dropped)
2207 Ether(src=host.mac, dst=self.pg0.local_mac)
2208 / IP(src=host.ip4, dst=self.pg0.local_ip4)
2209 / TCP(sport=host_in_port, dport=server_out_port)
2211 self.pg0.add_stream(p)
2212 self.pg_enable_capture(self.pg_interfaces)
2214 # Here VPP used to crash due to an infinite loop
2216 cnt = self.statistics["/nat44-ei/hairpinning"]
2217 # send packet from host to server
2219 Ether(src=host.mac, dst=self.pg0.local_mac)
2220 / IP(src=host.ip4, dst=self.nat_addr)
2221 / TCP(sport=host_in_port, dport=server_out_port)
2223 self.pg0.add_stream(p)
2224 self.pg_enable_capture(self.pg_interfaces)
2226 capture = self.pg0.get_capture(1)
2231 self.assertEqual(ip.src, self.nat_addr)
2232 self.assertEqual(ip.dst, server.ip4)
2233 self.assertNotEqual(tcp.sport, host_in_port)
2234 self.assertEqual(tcp.dport, server_in_port)
2235 self.assert_packet_checksums_valid(p)
2236 host_out_port = tcp.sport
2238 self.logger.error(ppp("Unexpected or invalid packet:", p))
2241 after = self.statistics["/nat44-ei/hairpinning"]
2242 if_idx = self.pg0.sw_if_index
2243 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2245 # send reply from server to host
2247 Ether(src=server.mac, dst=self.pg0.local_mac)
2248 / IP(src=server.ip4, dst=self.nat_addr)
2249 / TCP(sport=server_in_port, dport=host_out_port)
2251 self.pg0.add_stream(p)
2252 self.pg_enable_capture(self.pg_interfaces)
2254 capture = self.pg0.get_capture(1)
2259 self.assertEqual(ip.src, self.nat_addr)
2260 self.assertEqual(ip.dst, host.ip4)
2261 self.assertEqual(tcp.sport, server_out_port)
2262 self.assertEqual(tcp.dport, host_in_port)
2263 self.assert_packet_checksums_valid(p)
2265 self.logger.error(ppp("Unexpected or invalid packet:", p))
2268 after = self.statistics["/nat44-ei/hairpinning"]
2269 if_idx = self.pg0.sw_if_index
2271 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2272 2 + (1 if self.vpp_worker_count > 0 else 0),
2275 def test_interface_addr(self):
2276 """NAT44EI acquire addresses from interface"""
2277 self.vapi.nat44_ei_add_del_interface_addr(
2278 is_add=1, sw_if_index=self.pg7.sw_if_index
2281 # no address in NAT pool
2282 addresses = self.vapi.nat44_ei_address_dump()
2283 self.assertEqual(0, len(addresses))
2285 # configure interface address and check NAT address pool
2286 self.pg7.config_ip4()
2287 addresses = self.vapi.nat44_ei_address_dump()
2288 self.assertEqual(1, len(addresses))
2289 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2291 # remove interface address and check NAT address pool
2292 self.pg7.unconfig_ip4()
2293 addresses = self.vapi.nat44_ei_address_dump()
2294 self.assertEqual(0, len(addresses))
2296 def test_interface_addr_static_mapping(self):
2297 """NAT44EI Static mapping with addresses from interface"""
2300 self.vapi.nat44_ei_add_del_interface_addr(
2301 is_add=1, sw_if_index=self.pg7.sw_if_index
2303 self.nat44_add_static_mapping(
2304 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2307 # static mappings with external interface
2308 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2309 self.assertEqual(1, len(static_mappings))
2310 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2311 self.assertEqual(static_mappings[0].tag, tag)
2313 # configure interface address and check static mappings
2314 self.pg7.config_ip4()
2315 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2316 self.assertEqual(2, len(static_mappings))
2318 for sm in static_mappings:
2319 if sm.external_sw_if_index == 0xFFFFFFFF:
2320 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2321 self.assertEqual(sm.tag, tag)
2323 self.assertTrue(resolved)
2325 # remove interface address and check static mappings
2326 self.pg7.unconfig_ip4()
2327 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2328 self.assertEqual(1, len(static_mappings))
2329 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2330 self.assertEqual(static_mappings[0].tag, tag)
2332 # configure interface address again and check static mappings
2333 self.pg7.config_ip4()
2334 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2335 self.assertEqual(2, len(static_mappings))
2337 for sm in static_mappings:
2338 if sm.external_sw_if_index == 0xFFFFFFFF:
2339 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2340 self.assertEqual(sm.tag, tag)
2342 self.assertTrue(resolved)
2344 # remove static mapping
2345 self.nat44_add_static_mapping(
2346 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2348 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2349 self.assertEqual(0, len(static_mappings))
2351 def test_interface_addr_identity_nat(self):
2352 """NAT44EI Identity NAT with addresses from interface"""
2355 self.vapi.nat44_ei_add_del_interface_addr(
2356 is_add=1, sw_if_index=self.pg7.sw_if_index
2358 self.vapi.nat44_ei_add_del_identity_mapping(
2360 sw_if_index=self.pg7.sw_if_index,
2362 protocol=IP_PROTOS.tcp,
2366 # identity mappings with external interface
2367 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2368 self.assertEqual(1, len(identity_mappings))
2369 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2371 # configure interface address and check identity mappings
2372 self.pg7.config_ip4()
2373 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2375 self.assertEqual(2, len(identity_mappings))
2376 for sm in identity_mappings:
2377 if sm.sw_if_index == 0xFFFFFFFF:
2379 str(identity_mappings[0].ip_address), self.pg7.local_ip4
2381 self.assertEqual(port, identity_mappings[0].port)
2382 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2384 self.assertTrue(resolved)
2386 # remove interface address and check identity mappings
2387 self.pg7.unconfig_ip4()
2388 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2389 self.assertEqual(1, len(identity_mappings))
2390 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2392 def test_ipfix_nat44_sess(self):
2393 """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2394 self.ipfix_domain_id = 10
2395 self.ipfix_src_port = 20202
2396 collector_port = 30303
2397 bind_layers(UDP, IPFIX, dport=30303)
2398 self.nat44_add_address(self.nat_addr)
2399 flags = self.config_flags.NAT44_EI_IF_INSIDE
2400 self.vapi.nat44_ei_interface_add_del_feature(
2401 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2403 self.vapi.nat44_ei_interface_add_del_feature(
2404 sw_if_index=self.pg1.sw_if_index, is_add=1
2406 self.vapi.set_ipfix_exporter(
2407 collector_address=self.pg3.remote_ip4,
2408 src_address=self.pg3.local_ip4,
2410 template_interval=10,
2411 collector_port=collector_port,
2413 self.vapi.nat44_ei_ipfix_enable_disable(
2414 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2417 pkts = self.create_stream_in(self.pg0, self.pg1)
2418 self.pg0.add_stream(pkts)
2419 self.pg_enable_capture(self.pg_interfaces)
2421 capture = self.pg1.get_capture(len(pkts))
2422 self.verify_capture_out(capture)
2423 self.nat44_add_address(self.nat_addr, is_add=0)
2424 self.vapi.ipfix_flush()
2425 capture = self.pg3.get_capture(7)
2426 ipfix = IPFIXDecoder()
2427 # first load template
2429 self.assertTrue(p.haslayer(IPFIX))
2430 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2431 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2432 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2433 self.assertEqual(p[UDP].dport, collector_port)
2434 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2435 if p.haslayer(Template):
2436 ipfix.add_template(p.getlayer(Template))
2437 # verify events in data set
2439 if p.haslayer(Data):
2440 data = ipfix.decode_data_set(p.getlayer(Set))
2441 self.verify_ipfix_nat44_ses(data)
2443 def test_ipfix_addr_exhausted(self):
2444 """NAT44EI IPFIX logging NAT addresses exhausted"""
2445 flags = self.config_flags.NAT44_EI_IF_INSIDE
2446 self.vapi.nat44_ei_interface_add_del_feature(
2447 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2449 self.vapi.nat44_ei_interface_add_del_feature(
2450 sw_if_index=self.pg1.sw_if_index, is_add=1
2452 self.vapi.set_ipfix_exporter(
2453 collector_address=self.pg3.remote_ip4,
2454 src_address=self.pg3.local_ip4,
2456 template_interval=10,
2458 self.vapi.nat44_ei_ipfix_enable_disable(
2459 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2463 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2464 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2467 self.pg0.add_stream(p)
2468 self.pg_enable_capture(self.pg_interfaces)
2470 self.pg1.assert_nothing_captured()
2471 self.vapi.ipfix_flush()
2472 capture = self.pg3.get_capture(7)
2473 ipfix = IPFIXDecoder()
2474 # first load template
2476 self.assertTrue(p.haslayer(IPFIX))
2477 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2478 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2479 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2480 self.assertEqual(p[UDP].dport, 4739)
2481 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2482 if p.haslayer(Template):
2483 ipfix.add_template(p.getlayer(Template))
2484 # verify events in data set
2486 if p.haslayer(Data):
2487 data = ipfix.decode_data_set(p.getlayer(Set))
2488 self.verify_ipfix_addr_exhausted(data)
2490 def test_ipfix_max_sessions(self):
2491 """NAT44EI IPFIX logging maximum session entries exceeded"""
2492 self.nat44_add_address(self.nat_addr)
2493 flags = self.config_flags.NAT44_EI_IF_INSIDE
2494 self.vapi.nat44_ei_interface_add_del_feature(
2495 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2497 self.vapi.nat44_ei_interface_add_del_feature(
2498 sw_if_index=self.pg1.sw_if_index, is_add=1
2501 max_sessions_per_thread = self.max_translations
2502 max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2505 for i in range(0, max_sessions):
2506 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2508 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2509 / IP(src=src, dst=self.pg1.remote_ip4)
2513 self.pg0.add_stream(pkts)
2514 self.pg_enable_capture(self.pg_interfaces)
2517 self.pg1.get_capture(max_sessions)
2518 self.vapi.set_ipfix_exporter(
2519 collector_address=self.pg3.remote_ip4,
2520 src_address=self.pg3.local_ip4,
2522 template_interval=10,
2524 self.vapi.nat44_ei_ipfix_enable_disable(
2525 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2529 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2530 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2533 self.pg0.add_stream(p)
2534 self.pg_enable_capture(self.pg_interfaces)
2536 self.pg1.assert_nothing_captured()
2537 self.vapi.ipfix_flush()
2538 capture = self.pg3.get_capture(7)
2539 ipfix = IPFIXDecoder()
2540 # first load template
2542 self.assertTrue(p.haslayer(IPFIX))
2543 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2544 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2545 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2546 self.assertEqual(p[UDP].dport, 4739)
2547 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2548 if p.haslayer(Template):
2549 ipfix.add_template(p.getlayer(Template))
2550 # verify events in data set
2552 if p.haslayer(Data):
2553 data = ipfix.decode_data_set(p.getlayer(Set))
2554 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2556 def test_syslog_apmap(self):
2557 """NAT44EI syslog address and port mapping creation and deletion"""
2558 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2559 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2560 self.nat44_add_address(self.nat_addr)
2561 flags = self.config_flags.NAT44_EI_IF_INSIDE
2562 self.vapi.nat44_ei_interface_add_del_feature(
2563 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2565 self.vapi.nat44_ei_interface_add_del_feature(
2566 sw_if_index=self.pg1.sw_if_index, is_add=1
2570 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2571 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2572 / TCP(sport=self.tcp_port_in, dport=20)
2574 self.pg0.add_stream(p)
2575 self.pg_enable_capture(self.pg_interfaces)
2577 capture = self.pg1.get_capture(1)
2578 self.tcp_port_out = capture[0][TCP].sport
2579 capture = self.pg3.get_capture(1)
2580 self.verify_syslog_apmap(capture[0][Raw].load)
2582 self.pg_enable_capture(self.pg_interfaces)
2584 self.nat44_add_address(self.nat_addr, is_add=0)
2585 capture = self.pg3.get_capture(1)
2586 self.verify_syslog_apmap(capture[0][Raw].load, False)
2588 def test_pool_addr_fib(self):
2589 """NAT44EI add pool addresses to FIB"""
2590 static_addr = "10.0.0.10"
2591 self.nat44_add_address(self.nat_addr)
2592 flags = self.config_flags.NAT44_EI_IF_INSIDE
2593 self.vapi.nat44_ei_interface_add_del_feature(
2594 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2596 self.vapi.nat44_ei_interface_add_del_feature(
2597 sw_if_index=self.pg1.sw_if_index, is_add=1
2599 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2602 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2605 psrc=self.pg1.remote_ip4,
2606 hwsrc=self.pg1.remote_mac,
2608 self.pg1.add_stream(p)
2609 self.pg_enable_capture(self.pg_interfaces)
2611 capture = self.pg1.get_capture(1)
2612 self.assertTrue(capture[0].haslayer(ARP))
2613 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2616 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2619 psrc=self.pg1.remote_ip4,
2620 hwsrc=self.pg1.remote_mac,
2622 self.pg1.add_stream(p)
2623 self.pg_enable_capture(self.pg_interfaces)
2625 capture = self.pg1.get_capture(1)
2626 self.assertTrue(capture[0].haslayer(ARP))
2627 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2629 # send ARP to non-NAT44EI interface
2630 p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2633 psrc=self.pg2.remote_ip4,
2634 hwsrc=self.pg2.remote_mac,
2636 self.pg2.add_stream(p)
2637 self.pg_enable_capture(self.pg_interfaces)
2639 self.pg1.assert_nothing_captured()
2641 # remove addresses and verify
2642 self.nat44_add_address(self.nat_addr, is_add=0)
2643 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2645 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2648 psrc=self.pg1.remote_ip4,
2649 hwsrc=self.pg1.remote_mac,
2651 self.pg1.add_stream(p)
2652 self.pg_enable_capture(self.pg_interfaces)
2654 self.pg1.assert_nothing_captured()
2656 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2659 psrc=self.pg1.remote_ip4,
2660 hwsrc=self.pg1.remote_mac,
2662 self.pg1.add_stream(p)
2663 self.pg_enable_capture(self.pg_interfaces)
2665 self.pg1.assert_nothing_captured()
2667 def test_vrf_mode(self):
2668 """NAT44EI tenant VRF aware address pool mode"""
2672 nat_ip1 = "10.0.0.10"
2673 nat_ip2 = "10.0.0.11"
2675 self.pg0.unconfig_ip4()
2676 self.pg1.unconfig_ip4()
2677 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2678 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2679 self.pg0.set_table_ip4(vrf_id1)
2680 self.pg1.set_table_ip4(vrf_id2)
2681 self.pg0.config_ip4()
2682 self.pg1.config_ip4()
2683 self.pg0.resolve_arp()
2684 self.pg1.resolve_arp()
2686 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2687 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2688 flags = self.config_flags.NAT44_EI_IF_INSIDE
2689 self.vapi.nat44_ei_interface_add_del_feature(
2690 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2692 self.vapi.nat44_ei_interface_add_del_feature(
2693 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2695 self.vapi.nat44_ei_interface_add_del_feature(
2696 sw_if_index=self.pg2.sw_if_index, is_add=1
2701 pkts = self.create_stream_in(self.pg0, self.pg2)
2702 self.pg0.add_stream(pkts)
2703 self.pg_enable_capture(self.pg_interfaces)
2705 capture = self.pg2.get_capture(len(pkts))
2706 self.verify_capture_out(capture, nat_ip1)
2709 pkts = self.create_stream_in(self.pg1, self.pg2)
2710 self.pg1.add_stream(pkts)
2711 self.pg_enable_capture(self.pg_interfaces)
2713 capture = self.pg2.get_capture(len(pkts))
2714 self.verify_capture_out(capture, nat_ip2)
2717 self.pg0.unconfig_ip4()
2718 self.pg1.unconfig_ip4()
2719 self.pg0.set_table_ip4(0)
2720 self.pg1.set_table_ip4(0)
2721 self.pg0.config_ip4()
2722 self.pg1.config_ip4()
2723 self.pg0.resolve_arp()
2724 self.pg1.resolve_arp()
2725 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2726 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2728 def test_vrf_feature_independent(self):
2729 """NAT44EI tenant VRF independent address pool mode"""
2731 nat_ip1 = "10.0.0.10"
2732 nat_ip2 = "10.0.0.11"
2734 self.nat44_add_address(nat_ip1)
2735 self.nat44_add_address(nat_ip2, vrf_id=99)
2736 flags = self.config_flags.NAT44_EI_IF_INSIDE
2737 self.vapi.nat44_ei_interface_add_del_feature(
2738 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2740 self.vapi.nat44_ei_interface_add_del_feature(
2741 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2743 self.vapi.nat44_ei_interface_add_del_feature(
2744 sw_if_index=self.pg2.sw_if_index, is_add=1
2748 pkts = self.create_stream_in(self.pg0, self.pg2)
2749 self.pg0.add_stream(pkts)
2750 self.pg_enable_capture(self.pg_interfaces)
2752 capture = self.pg2.get_capture(len(pkts))
2753 self.verify_capture_out(capture, nat_ip1)
2756 pkts = self.create_stream_in(self.pg1, self.pg2)
2757 self.pg1.add_stream(pkts)
2758 self.pg_enable_capture(self.pg_interfaces)
2760 capture = self.pg2.get_capture(len(pkts))
2761 self.verify_capture_out(capture, nat_ip1)
2763 def test_dynamic_ipless_interfaces(self):
2764 """NAT44EI interfaces without configured IP address"""
2765 self.create_routes_and_neigbors()
2766 self.nat44_add_address(self.nat_addr)
2767 flags = self.config_flags.NAT44_EI_IF_INSIDE
2768 self.vapi.nat44_ei_interface_add_del_feature(
2769 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2771 self.vapi.nat44_ei_interface_add_del_feature(
2772 sw_if_index=self.pg8.sw_if_index, is_add=1
2776 pkts = self.create_stream_in(self.pg7, self.pg8)
2777 self.pg7.add_stream(pkts)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 capture = self.pg8.get_capture(len(pkts))
2781 self.verify_capture_out(capture)
2784 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2785 self.pg8.add_stream(pkts)
2786 self.pg_enable_capture(self.pg_interfaces)
2788 capture = self.pg7.get_capture(len(pkts))
2789 self.verify_capture_in(capture, self.pg7)
2791 def test_static_ipless_interfaces(self):
2792 """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2794 self.create_routes_and_neigbors()
2795 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2796 flags = self.config_flags.NAT44_EI_IF_INSIDE
2797 self.vapi.nat44_ei_interface_add_del_feature(
2798 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2800 self.vapi.nat44_ei_interface_add_del_feature(
2801 sw_if_index=self.pg8.sw_if_index, is_add=1
2805 pkts = self.create_stream_out(self.pg8)
2806 self.pg8.add_stream(pkts)
2807 self.pg_enable_capture(self.pg_interfaces)
2809 capture = self.pg7.get_capture(len(pkts))
2810 self.verify_capture_in(capture, self.pg7)
2813 pkts = self.create_stream_in(self.pg7, self.pg8)
2814 self.pg7.add_stream(pkts)
2815 self.pg_enable_capture(self.pg_interfaces)
2817 capture = self.pg8.get_capture(len(pkts))
2818 self.verify_capture_out(capture, self.nat_addr, True)
2820 def test_static_with_port_ipless_interfaces(self):
2821 """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2823 self.tcp_port_out = 30606
2824 self.udp_port_out = 30607
2825 self.icmp_id_out = 30608
2827 self.create_routes_and_neigbors()
2828 self.nat44_add_address(self.nat_addr)
2829 self.nat44_add_static_mapping(
2830 self.pg7.remote_ip4,
2834 proto=IP_PROTOS.tcp,
2836 self.nat44_add_static_mapping(
2837 self.pg7.remote_ip4,
2841 proto=IP_PROTOS.udp,
2843 self.nat44_add_static_mapping(
2844 self.pg7.remote_ip4,
2848 proto=IP_PROTOS.icmp,
2850 flags = self.config_flags.NAT44_EI_IF_INSIDE
2851 self.vapi.nat44_ei_interface_add_del_feature(
2852 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2854 self.vapi.nat44_ei_interface_add_del_feature(
2855 sw_if_index=self.pg8.sw_if_index, is_add=1
2859 pkts = self.create_stream_out(self.pg8)
2860 self.pg8.add_stream(pkts)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg7.get_capture(len(pkts))
2864 self.verify_capture_in(capture, self.pg7)
2867 pkts = self.create_stream_in(self.pg7, self.pg8)
2868 self.pg7.add_stream(pkts)
2869 self.pg_enable_capture(self.pg_interfaces)
2871 capture = self.pg8.get_capture(len(pkts))
2872 self.verify_capture_out(capture)
2874 def test_static_unknown_proto(self):
2875 """NAT44EI 1:1 translate packet with unknown protocol"""
2876 nat_ip = "10.0.0.10"
2877 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2878 flags = self.config_flags.NAT44_EI_IF_INSIDE
2879 self.vapi.nat44_ei_interface_add_del_feature(
2880 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2882 self.vapi.nat44_ei_interface_add_del_feature(
2883 sw_if_index=self.pg1.sw_if_index, is_add=1
2888 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2889 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2891 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2892 / TCP(sport=1234, dport=1234)
2894 self.pg0.add_stream(p)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 p = self.pg1.get_capture(1)
2900 self.assertEqual(packet[IP].src, nat_ip)
2901 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2902 self.assertEqual(packet.haslayer(GRE), 1)
2903 self.assert_packet_checksums_valid(packet)
2905 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2910 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2911 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2913 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2914 / TCP(sport=1234, dport=1234)
2916 self.pg1.add_stream(p)
2917 self.pg_enable_capture(self.pg_interfaces)
2919 p = self.pg0.get_capture(1)
2922 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2923 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2924 self.assertEqual(packet.haslayer(GRE), 1)
2925 self.assert_packet_checksums_valid(packet)
2927 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2930 def test_hairpinning_static_unknown_proto(self):
2931 """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2933 host = self.pg0.remote_hosts[0]
2934 server = self.pg0.remote_hosts[1]
2936 host_nat_ip = "10.0.0.10"
2937 server_nat_ip = "10.0.0.11"
2939 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2940 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2941 flags = self.config_flags.NAT44_EI_IF_INSIDE
2942 self.vapi.nat44_ei_interface_add_del_feature(
2943 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2945 self.vapi.nat44_ei_interface_add_del_feature(
2946 sw_if_index=self.pg1.sw_if_index, is_add=1
2951 Ether(dst=self.pg0.local_mac, src=host.mac)
2952 / IP(src=host.ip4, dst=server_nat_ip)
2954 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2955 / TCP(sport=1234, dport=1234)
2957 self.pg0.add_stream(p)
2958 self.pg_enable_capture(self.pg_interfaces)
2960 p = self.pg0.get_capture(1)
2963 self.assertEqual(packet[IP].src, host_nat_ip)
2964 self.assertEqual(packet[IP].dst, server.ip4)
2965 self.assertEqual(packet.haslayer(GRE), 1)
2966 self.assert_packet_checksums_valid(packet)
2968 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2973 Ether(dst=self.pg0.local_mac, src=server.mac)
2974 / IP(src=server.ip4, dst=host_nat_ip)
2976 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2977 / TCP(sport=1234, dport=1234)
2979 self.pg0.add_stream(p)
2980 self.pg_enable_capture(self.pg_interfaces)
2982 p = self.pg0.get_capture(1)
2985 self.assertEqual(packet[IP].src, server_nat_ip)
2986 self.assertEqual(packet[IP].dst, host.ip4)
2987 self.assertEqual(packet.haslayer(GRE), 1)
2988 self.assert_packet_checksums_valid(packet)
2990 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2993 def test_output_feature(self):
2994 """NAT44EI output feature (in2out postrouting)"""
2995 self.nat44_add_address(self.nat_addr)
2996 self.vapi.nat44_ei_add_del_output_interface(
2997 sw_if_index=self.pg3.sw_if_index, is_add=1
3001 pkts = self.create_stream_in(self.pg0, self.pg3)
3002 self.pg0.add_stream(pkts)
3003 self.pg_enable_capture(self.pg_interfaces)
3005 capture = self.pg3.get_capture(len(pkts))
3006 self.verify_capture_out(capture)
3009 pkts = self.create_stream_out(self.pg3)
3010 self.pg3.add_stream(pkts)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 capture = self.pg0.get_capture(len(pkts))
3014 self.verify_capture_in(capture, self.pg0)
3016 # from non-NAT interface to NAT inside interface
3017 pkts = self.create_stream_in(self.pg2, self.pg0)
3018 self.pg2.add_stream(pkts)
3019 self.pg_enable_capture(self.pg_interfaces)
3021 capture = self.pg0.get_capture(len(pkts))
3022 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3024 def test_output_feature_vrf_aware(self):
3025 """NAT44EI output feature VRF aware (in2out postrouting)"""
3026 nat_ip_vrf10 = "10.0.0.10"
3027 nat_ip_vrf20 = "10.0.0.20"
3031 self.pg3.remote_ip4,
3033 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3038 self.pg3.remote_ip4,
3040 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3046 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3047 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3048 self.vapi.nat44_ei_add_del_output_interface(
3049 sw_if_index=self.pg3.sw_if_index, is_add=1
3053 pkts = self.create_stream_in(self.pg4, self.pg3)
3054 self.pg4.add_stream(pkts)
3055 self.pg_enable_capture(self.pg_interfaces)
3057 capture = self.pg3.get_capture(len(pkts))
3058 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3061 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3062 self.pg3.add_stream(pkts)
3063 self.pg_enable_capture(self.pg_interfaces)
3065 capture = self.pg4.get_capture(len(pkts))
3066 self.verify_capture_in(capture, self.pg4)
3069 pkts = self.create_stream_in(self.pg6, self.pg3)
3070 self.pg6.add_stream(pkts)
3071 self.pg_enable_capture(self.pg_interfaces)
3073 capture = self.pg3.get_capture(len(pkts))
3074 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3077 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3078 self.pg3.add_stream(pkts)
3079 self.pg_enable_capture(self.pg_interfaces)
3081 capture = self.pg6.get_capture(len(pkts))
3082 self.verify_capture_in(capture, self.pg6)
3084 def test_output_feature_hairpinning(self):
3085 """NAT44EI output feature hairpinning (in2out postrouting)"""
3086 host = self.pg0.remote_hosts[0]
3087 server = self.pg0.remote_hosts[1]
3090 server_in_port = 5678
3091 server_out_port = 8765
3093 self.nat44_add_address(self.nat_addr)
3094 self.vapi.nat44_ei_add_del_output_interface(
3095 sw_if_index=self.pg0.sw_if_index, is_add=1
3097 self.vapi.nat44_ei_add_del_output_interface(
3098 sw_if_index=self.pg1.sw_if_index, is_add=1
3101 # add static mapping for server
3102 self.nat44_add_static_mapping(
3107 proto=IP_PROTOS.tcp,
3110 # send packet from host to server
3112 Ether(src=host.mac, dst=self.pg0.local_mac)
3113 / IP(src=host.ip4, dst=self.nat_addr)
3114 / TCP(sport=host_in_port, dport=server_out_port)
3116 self.pg0.add_stream(p)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 capture = self.pg0.get_capture(1)
3124 self.assertEqual(ip.src, self.nat_addr)
3125 self.assertEqual(ip.dst, server.ip4)
3126 self.assertNotEqual(tcp.sport, host_in_port)
3127 self.assertEqual(tcp.dport, server_in_port)
3128 self.assert_packet_checksums_valid(p)
3129 host_out_port = tcp.sport
3131 self.logger.error(ppp("Unexpected or invalid packet:", p))
3134 # send reply from server to host
3136 Ether(src=server.mac, dst=self.pg0.local_mac)
3137 / IP(src=server.ip4, dst=self.nat_addr)
3138 / TCP(sport=server_in_port, dport=host_out_port)
3140 self.pg0.add_stream(p)
3141 self.pg_enable_capture(self.pg_interfaces)
3143 capture = self.pg0.get_capture(1)
3148 self.assertEqual(ip.src, self.nat_addr)
3149 self.assertEqual(ip.dst, host.ip4)
3150 self.assertEqual(tcp.sport, server_out_port)
3151 self.assertEqual(tcp.dport, host_in_port)
3152 self.assert_packet_checksums_valid(p)
3154 self.logger.error(ppp("Unexpected or invalid packet:", p))
3157 def test_one_armed_nat44(self):
3158 """NAT44EI One armed NAT"""
3159 remote_host = self.pg9.remote_hosts[0]
3160 local_host = self.pg9.remote_hosts[1]
3163 self.nat44_add_address(self.nat_addr)
3164 flags = self.config_flags.NAT44_EI_IF_INSIDE
3165 self.vapi.nat44_ei_interface_add_del_feature(
3166 sw_if_index=self.pg9.sw_if_index, is_add=1
3168 self.vapi.nat44_ei_interface_add_del_feature(
3169 sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3174 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3175 / IP(src=local_host.ip4, dst=remote_host.ip4)
3176 / TCP(sport=12345, dport=80)
3178 self.pg9.add_stream(p)
3179 self.pg_enable_capture(self.pg_interfaces)
3181 capture = self.pg9.get_capture(1)
3186 self.assertEqual(ip.src, self.nat_addr)
3187 self.assertEqual(ip.dst, remote_host.ip4)
3188 self.assertNotEqual(tcp.sport, 12345)
3189 external_port = tcp.sport
3190 self.assertEqual(tcp.dport, 80)
3191 self.assert_packet_checksums_valid(p)
3193 self.logger.error(ppp("Unexpected or invalid packet:", p))
3198 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3199 / IP(src=remote_host.ip4, dst=self.nat_addr)
3200 / TCP(sport=80, dport=external_port)
3202 self.pg9.add_stream(p)
3203 self.pg_enable_capture(self.pg_interfaces)
3205 capture = self.pg9.get_capture(1)
3210 self.assertEqual(ip.src, remote_host.ip4)
3211 self.assertEqual(ip.dst, local_host.ip4)
3212 self.assertEqual(tcp.sport, 80)
3213 self.assertEqual(tcp.dport, 12345)
3214 self.assert_packet_checksums_valid(p)
3216 self.logger.error(ppp("Unexpected or invalid packet:", p))
3219 if self.vpp_worker_count > 1:
3220 node = "nat44-ei-handoff-classify"
3222 node = "nat44-ei-classify"
3224 err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3225 self.assertEqual(err, 1)
3226 err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3227 self.assertEqual(err, 1)
3229 def test_del_session(self):
3230 """NAT44EI delete session"""
3231 self.nat44_add_address(self.nat_addr)
3232 flags = self.config_flags.NAT44_EI_IF_INSIDE
3233 self.vapi.nat44_ei_interface_add_del_feature(
3234 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3236 self.vapi.nat44_ei_interface_add_del_feature(
3237 sw_if_index=self.pg1.sw_if_index, is_add=1
3240 pkts = self.create_stream_in(self.pg0, self.pg1)
3241 self.pg0.add_stream(pkts)
3242 self.pg_enable_capture(self.pg_interfaces)
3244 self.pg1.get_capture(len(pkts))
3246 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3247 nsessions = len(sessions)
3249 self.vapi.nat44_ei_del_session(
3250 address=sessions[0].inside_ip_address,
3251 port=sessions[0].inside_port,
3252 protocol=sessions[0].protocol,
3253 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3256 self.vapi.nat44_ei_del_session(
3257 address=sessions[1].outside_ip_address,
3258 port=sessions[1].outside_port,
3259 protocol=sessions[1].protocol,
3262 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3263 self.assertEqual(nsessions - len(sessions), 2)
3265 self.vapi.nat44_ei_del_session(
3266 address=sessions[0].inside_ip_address,
3267 port=sessions[0].inside_port,
3268 protocol=sessions[0].protocol,
3269 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3272 self.verify_no_nat44_user()
3274 def test_frag_in_order(self):
3275 """NAT44EI translate fragments arriving in order"""
3277 self.nat44_add_address(self.nat_addr)
3278 flags = self.config_flags.NAT44_EI_IF_INSIDE
3279 self.vapi.nat44_ei_interface_add_del_feature(
3280 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3282 self.vapi.nat44_ei_interface_add_del_feature(
3283 sw_if_index=self.pg1.sw_if_index, is_add=1
3286 self.frag_in_order(proto=IP_PROTOS.tcp)
3287 self.frag_in_order(proto=IP_PROTOS.udp)
3288 self.frag_in_order(proto=IP_PROTOS.icmp)
3290 def test_frag_forwarding(self):
3291 """NAT44EI forwarding fragment test"""
3292 self.vapi.nat44_ei_add_del_interface_addr(
3293 is_add=1, sw_if_index=self.pg1.sw_if_index
3295 flags = self.config_flags.NAT44_EI_IF_INSIDE
3296 self.vapi.nat44_ei_interface_add_del_feature(
3297 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3299 self.vapi.nat44_ei_interface_add_del_feature(
3300 sw_if_index=self.pg1.sw_if_index, is_add=1
3302 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3304 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3305 pkts = self.create_stream_frag(
3306 self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3308 self.pg1.add_stream(pkts)
3309 self.pg_enable_capture(self.pg_interfaces)
3311 frags = self.pg0.get_capture(len(pkts))
3312 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3313 self.assertEqual(p[UDP].sport, 4789)
3314 self.assertEqual(p[UDP].dport, 4789)
3315 self.assertEqual(data, p[Raw].load)
3317 def test_reass_hairpinning(self):
3318 """NAT44EI fragments hairpinning"""
3320 server_addr = self.pg0.remote_hosts[1].ip4
3321 host_in_port = random.randint(1025, 65535)
3322 server_in_port = random.randint(1025, 65535)
3323 server_out_port = random.randint(1025, 65535)
3325 self.nat44_add_address(self.nat_addr)
3326 flags = self.config_flags.NAT44_EI_IF_INSIDE
3327 self.vapi.nat44_ei_interface_add_del_feature(
3328 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3330 self.vapi.nat44_ei_interface_add_del_feature(
3331 sw_if_index=self.pg1.sw_if_index, is_add=1
3333 # add static mapping for server
3334 self.nat44_add_static_mapping(
3339 proto=IP_PROTOS.tcp,
3341 self.nat44_add_static_mapping(
3346 proto=IP_PROTOS.udp,
3348 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3350 self.reass_hairpinning(
3355 proto=IP_PROTOS.tcp,
3357 self.reass_hairpinning(
3362 proto=IP_PROTOS.udp,
3364 self.reass_hairpinning(
3369 proto=IP_PROTOS.icmp,
3372 def test_frag_out_of_order(self):
3373 """NAT44EI translate fragments arriving out of order"""
3375 self.nat44_add_address(self.nat_addr)
3376 flags = self.config_flags.NAT44_EI_IF_INSIDE
3377 self.vapi.nat44_ei_interface_add_del_feature(
3378 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3380 self.vapi.nat44_ei_interface_add_del_feature(
3381 sw_if_index=self.pg1.sw_if_index, is_add=1
3384 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3385 self.frag_out_of_order(proto=IP_PROTOS.udp)
3386 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3388 def test_port_restricted(self):
3389 """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3390 self.nat44_add_address(self.nat_addr)
3391 flags = self.config_flags.NAT44_EI_IF_INSIDE
3392 self.vapi.nat44_ei_interface_add_del_feature(
3393 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3395 self.vapi.nat44_ei_interface_add_del_feature(
3396 sw_if_index=self.pg1.sw_if_index, is_add=1
3398 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3399 alg=1, psid_offset=6, psid_length=6, psid=10
3403 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3404 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3405 / TCP(sport=4567, dport=22)
3407 self.pg0.add_stream(p)
3408 self.pg_enable_capture(self.pg_interfaces)
3410 capture = self.pg1.get_capture(1)
3415 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3416 self.assertEqual(ip.src, self.nat_addr)
3417 self.assertEqual(tcp.dport, 22)
3418 self.assertNotEqual(tcp.sport, 4567)
3419 self.assertEqual((tcp.sport >> 6) & 63, 10)
3420 self.assert_packet_checksums_valid(p)
3422 self.logger.error(ppp("Unexpected or invalid packet:", p))
3425 def test_port_range(self):
3426 """NAT44EI External address port range"""
3427 self.nat44_add_address(self.nat_addr)
3428 flags = self.config_flags.NAT44_EI_IF_INSIDE
3429 self.vapi.nat44_ei_interface_add_del_feature(
3430 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3432 self.vapi.nat44_ei_interface_add_del_feature(
3433 sw_if_index=self.pg1.sw_if_index, is_add=1
3435 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3436 alg=2, start_port=1025, end_port=1027
3440 for port in range(0, 5):
3442 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3443 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3444 / TCP(sport=1125 + port)
3447 self.pg0.add_stream(pkts)
3448 self.pg_enable_capture(self.pg_interfaces)
3450 capture = self.pg1.get_capture(3)
3453 self.assertGreaterEqual(tcp.sport, 1025)
3454 self.assertLessEqual(tcp.sport, 1027)
3456 def test_multiple_outside_vrf(self):
3457 """NAT44EI Multiple outside VRF"""
3461 self.pg1.unconfig_ip4()
3462 self.pg2.unconfig_ip4()
3463 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3464 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3465 self.pg1.set_table_ip4(vrf_id1)
3466 self.pg2.set_table_ip4(vrf_id2)
3467 self.pg1.config_ip4()
3468 self.pg2.config_ip4()
3469 self.pg1.resolve_arp()
3470 self.pg2.resolve_arp()
3472 self.nat44_add_address(self.nat_addr)
3473 flags = self.config_flags.NAT44_EI_IF_INSIDE
3474 self.vapi.nat44_ei_interface_add_del_feature(
3475 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3477 self.vapi.nat44_ei_interface_add_del_feature(
3478 sw_if_index=self.pg1.sw_if_index, is_add=1
3480 self.vapi.nat44_ei_interface_add_del_feature(
3481 sw_if_index=self.pg2.sw_if_index, is_add=1
3486 pkts = self.create_stream_in(self.pg0, self.pg1)
3487 self.pg0.add_stream(pkts)
3488 self.pg_enable_capture(self.pg_interfaces)
3490 capture = self.pg1.get_capture(len(pkts))
3491 self.verify_capture_out(capture, self.nat_addr)
3493 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3494 self.pg1.add_stream(pkts)
3495 self.pg_enable_capture(self.pg_interfaces)
3497 capture = self.pg0.get_capture(len(pkts))
3498 self.verify_capture_in(capture, self.pg0)
3500 self.tcp_port_in = 60303
3501 self.udp_port_in = 60304
3502 self.icmp_id_in = 60305
3505 pkts = self.create_stream_in(self.pg0, self.pg2)
3506 self.pg0.add_stream(pkts)
3507 self.pg_enable_capture(self.pg_interfaces)
3509 capture = self.pg2.get_capture(len(pkts))
3510 self.verify_capture_out(capture, self.nat_addr)
3512 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3513 self.pg2.add_stream(pkts)
3514 self.pg_enable_capture(self.pg_interfaces)
3516 capture = self.pg0.get_capture(len(pkts))
3517 self.verify_capture_in(capture, self.pg0)
3520 self.nat44_add_address(self.nat_addr, is_add=0)
3521 self.pg1.unconfig_ip4()
3522 self.pg2.unconfig_ip4()
3523 self.pg1.set_table_ip4(0)
3524 self.pg2.set_table_ip4(0)
3525 self.pg1.config_ip4()
3526 self.pg2.config_ip4()
3527 self.pg1.resolve_arp()
3528 self.pg2.resolve_arp()
3530 def test_mss_clamping(self):
3531 """NAT44EI TCP MSS clamping"""
3532 self.nat44_add_address(self.nat_addr)
3533 flags = self.config_flags.NAT44_EI_IF_INSIDE
3534 self.vapi.nat44_ei_interface_add_del_feature(
3535 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3537 self.vapi.nat44_ei_interface_add_del_feature(
3538 sw_if_index=self.pg1.sw_if_index, is_add=1
3542 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3543 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3545 sport=self.tcp_port_in,
3546 dport=self.tcp_external_port,
3548 options=[("MSS", 1400)],
3552 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3553 self.pg0.add_stream(p)
3554 self.pg_enable_capture(self.pg_interfaces)
3556 capture = self.pg1.get_capture(1)
3557 # Negotiated MSS value greater than configured - changed
3558 self.verify_mss_value(capture[0], 1000)
3560 self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3561 self.pg0.add_stream(p)
3562 self.pg_enable_capture(self.pg_interfaces)
3564 capture = self.pg1.get_capture(1)
3565 # MSS clamping disabled - negotiated MSS unchanged
3566 self.verify_mss_value(capture[0], 1400)
3568 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3569 self.pg0.add_stream(p)
3570 self.pg_enable_capture(self.pg_interfaces)
3572 capture = self.pg1.get_capture(1)
3573 # Negotiated MSS value smaller than configured - unchanged
3574 self.verify_mss_value(capture[0], 1400)
3576 def test_ha_send(self):
3577 """NAT44EI Send HA session synchronization events (active)"""
3578 flags = self.config_flags.NAT44_EI_IF_INSIDE
3579 self.vapi.nat44_ei_interface_add_del_feature(
3580 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3582 self.vapi.nat44_ei_interface_add_del_feature(
3583 sw_if_index=self.pg1.sw_if_index, is_add=1
3585 self.nat44_add_address(self.nat_addr)
3587 self.vapi.nat44_ei_ha_set_listener(
3588 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3590 self.vapi.nat44_ei_ha_set_failover(
3591 ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3593 bind_layers(UDP, HANATStateSync, sport=12345)
3596 pkts = self.create_stream_in(self.pg0, self.pg1)
3597 self.pg0.add_stream(pkts)
3598 self.pg_enable_capture(self.pg_interfaces)
3600 capture = self.pg1.get_capture(len(pkts))
3601 self.verify_capture_out(capture)
3602 # active send HA events
3603 self.vapi.nat44_ei_ha_flush()
3604 stats = self.statistics["/nat44-ei/ha/add-event-send"]
3605 self.assertEqual(stats[:, 0].sum(), 3)
3606 capture = self.pg3.get_capture(1)
3608 self.assert_packet_checksums_valid(p)
3612 hanat = p[HANATStateSync]
3614 self.logger.error(ppp("Invalid packet:", p))
3617 self.assertEqual(ip.src, self.pg3.local_ip4)
3618 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3619 self.assertEqual(udp.sport, 12345)
3620 self.assertEqual(udp.dport, 12346)
3621 self.assertEqual(hanat.version, 1)
3622 # self.assertEqual(hanat.thread_index, 0)
3623 self.assertEqual(hanat.count, 3)
3624 seq = hanat.sequence_number
3625 for event in hanat.events:
3626 self.assertEqual(event.event_type, 1)
3627 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3628 self.assertEqual(event.out_addr, self.nat_addr)
3629 self.assertEqual(event.fib_index, 0)
3631 # ACK received events
3633 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3634 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3635 / UDP(sport=12346, dport=12345)
3637 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3640 self.pg3.add_stream(ack)
3642 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3643 self.assertEqual(stats[:, 0].sum(), 1)
3645 # delete one session
3646 self.pg_enable_capture(self.pg_interfaces)
3647 self.vapi.nat44_ei_del_session(
3648 address=self.pg0.remote_ip4,
3649 port=self.tcp_port_in,
3650 protocol=IP_PROTOS.tcp,
3651 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3653 self.vapi.nat44_ei_ha_flush()
3654 stats = self.statistics["/nat44-ei/ha/del-event-send"]
3655 self.assertEqual(stats[:, 0].sum(), 1)
3656 capture = self.pg3.get_capture(1)
3659 hanat = p[HANATStateSync]
3661 self.logger.error(ppp("Invalid packet:", p))
3664 self.assertGreater(hanat.sequence_number, seq)
3666 # do not send ACK, active retry send HA event again
3667 self.pg_enable_capture(self.pg_interfaces)
3668 self.virtual_sleep(12)
3669 stats = self.statistics["/nat44-ei/ha/retry-count"]
3670 self.assertEqual(stats[:, 0].sum(), 3)
3671 stats = self.statistics["/nat44-ei/ha/missed-count"]
3672 self.assertEqual(stats[:, 0].sum(), 1)
3673 capture = self.pg3.get_capture(3)
3674 for packet in capture:
3675 self.assertEqual(packet, p)
3677 # session counters refresh
3678 pkts = self.create_stream_out(self.pg1)
3679 self.pg1.add_stream(pkts)
3680 self.pg_enable_capture(self.pg_interfaces)
3682 self.pg0.get_capture(2)
3683 self.vapi.nat44_ei_ha_flush()
3684 stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3685 self.assertEqual(stats[:, 0].sum(), 2)
3686 capture = self.pg3.get_capture(1)
3688 self.assert_packet_checksums_valid(p)
3692 hanat = p[HANATStateSync]
3694 self.logger.error(ppp("Invalid packet:", p))
3697 self.assertEqual(ip.src, self.pg3.local_ip4)
3698 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3699 self.assertEqual(udp.sport, 12345)
3700 self.assertEqual(udp.dport, 12346)
3701 self.assertEqual(hanat.version, 1)
3702 self.assertEqual(hanat.count, 2)
3703 seq = hanat.sequence_number
3704 for event in hanat.events:
3705 self.assertEqual(event.event_type, 3)
3706 self.assertEqual(event.out_addr, self.nat_addr)
3707 self.assertEqual(event.fib_index, 0)
3708 self.assertEqual(event.total_pkts, 2)
3709 self.assertGreater(event.total_bytes, 0)
3711 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3713 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3714 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3715 / UDP(sport=12346, dport=12345)
3717 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3720 self.pg3.add_stream(ack)
3722 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3723 self.assertEqual(stats[:, 0].sum(), 2)
3725 def test_ha_recv(self):
3726 """NAT44EI Receive HA session synchronization events (passive)"""
3727 self.nat44_add_address(self.nat_addr)
3728 flags = self.config_flags.NAT44_EI_IF_INSIDE
3729 self.vapi.nat44_ei_interface_add_del_feature(
3730 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3732 self.vapi.nat44_ei_interface_add_del_feature(
3733 sw_if_index=self.pg1.sw_if_index, is_add=1
3735 self.vapi.nat44_ei_ha_set_listener(
3736 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3738 bind_layers(UDP, HANATStateSync, sport=12345)
3740 # this is a bit tricky - HA dictates thread index due to how it's
3741 # designed, but once we use HA to create a session, we also want
3742 # to pass a packet through said session. so the session must end
3743 # up on the correct thread from both directions - in2out (based on
3744 # IP address) and out2in (based on outside port)
3746 # first choose a thread index which is correct for IP
3747 thread_index = get_nat44_ei_in2out_worker_index(
3748 self.pg0.remote_ip4, self.vpp_worker_count
3751 # now pick a port which is correct for given thread
3752 port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3753 self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3754 self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3755 if self.vpp_worker_count > 0:
3756 self.tcp_port_out += port_per_thread * (thread_index - 1)
3757 self.udp_port_out += port_per_thread * (thread_index - 1)
3759 # send HA session add events to failover/passive
3761 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3762 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3763 / UDP(sport=12346, dport=12345)
3770 in_addr=self.pg0.remote_ip4,
3771 out_addr=self.nat_addr,
3772 in_port=self.tcp_port_in,
3773 out_port=self.tcp_port_out,
3774 eh_addr=self.pg1.remote_ip4,
3775 ehn_addr=self.pg1.remote_ip4,
3776 eh_port=self.tcp_external_port,
3777 ehn_port=self.tcp_external_port,
3783 in_addr=self.pg0.remote_ip4,
3784 out_addr=self.nat_addr,
3785 in_port=self.udp_port_in,
3786 out_port=self.udp_port_out,
3787 eh_addr=self.pg1.remote_ip4,
3788 ehn_addr=self.pg1.remote_ip4,
3789 eh_port=self.udp_external_port,
3790 ehn_port=self.udp_external_port,
3794 thread_index=thread_index,
3798 self.pg3.add_stream(p)
3799 self.pg_enable_capture(self.pg_interfaces)
3802 capture = self.pg3.get_capture(1)
3805 hanat = p[HANATStateSync]
3807 self.logger.error(ppp("Invalid packet:", p))
3810 self.assertEqual(hanat.sequence_number, 1)
3811 self.assertEqual(hanat.flags, "ACK")
3812 self.assertEqual(hanat.version, 1)
3813 self.assertEqual(hanat.thread_index, thread_index)
3814 stats = self.statistics["/nat44-ei/ha/ack-send"]
3815 self.assertEqual(stats[:, 0].sum(), 1)
3816 stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3817 self.assertEqual(stats[:, 0].sum(), 2)
3818 users = self.statistics["/nat44-ei/total-users"]
3819 self.assertEqual(users[:, 0].sum(), 1)
3820 sessions = self.statistics["/nat44-ei/total-sessions"]
3821 self.assertEqual(sessions[:, 0].sum(), 2)
3822 users = self.vapi.nat44_ei_user_dump()
3823 self.assertEqual(len(users), 1)
3824 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3825 # there should be 2 sessions created by HA
3826 sessions = self.vapi.nat44_ei_user_session_dump(
3827 users[0].ip_address, users[0].vrf_id
3829 self.assertEqual(len(sessions), 2)
3830 for session in sessions:
3831 self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3832 self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3833 self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3834 self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3835 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3837 # send HA session delete event to failover/passive
3839 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3840 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3841 / UDP(sport=12346, dport=12345)
3848 in_addr=self.pg0.remote_ip4,
3849 out_addr=self.nat_addr,
3850 in_port=self.udp_port_in,
3851 out_port=self.udp_port_out,
3852 eh_addr=self.pg1.remote_ip4,
3853 ehn_addr=self.pg1.remote_ip4,
3854 eh_port=self.udp_external_port,
3855 ehn_port=self.udp_external_port,
3859 thread_index=thread_index,
3863 self.pg3.add_stream(p)
3864 self.pg_enable_capture(self.pg_interfaces)
3867 capture = self.pg3.get_capture(1)
3870 hanat = p[HANATStateSync]
3872 self.logger.error(ppp("Invalid packet:", p))
3875 self.assertEqual(hanat.sequence_number, 2)
3876 self.assertEqual(hanat.flags, "ACK")
3877 self.assertEqual(hanat.version, 1)
3878 users = self.vapi.nat44_ei_user_dump()
3879 self.assertEqual(len(users), 1)
3880 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3881 # now we should have only 1 session, 1 deleted by HA
3882 sessions = self.vapi.nat44_ei_user_session_dump(
3883 users[0].ip_address, users[0].vrf_id
3885 self.assertEqual(len(sessions), 1)
3886 stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3887 self.assertEqual(stats[:, 0].sum(), 1)
3889 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3890 self.assertEqual(stats, 2)
3892 # send HA session refresh event to failover/passive
3894 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3895 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3896 / UDP(sport=12346, dport=12345)
3901 event_type="refresh",
3903 in_addr=self.pg0.remote_ip4,
3904 out_addr=self.nat_addr,
3905 in_port=self.tcp_port_in,
3906 out_port=self.tcp_port_out,
3907 eh_addr=self.pg1.remote_ip4,
3908 ehn_addr=self.pg1.remote_ip4,
3909 eh_port=self.tcp_external_port,
3910 ehn_port=self.tcp_external_port,
3916 thread_index=thread_index,
3919 self.pg3.add_stream(p)
3920 self.pg_enable_capture(self.pg_interfaces)
3923 capture = self.pg3.get_capture(1)
3926 hanat = p[HANATStateSync]
3928 self.logger.error(ppp("Invalid packet:", p))
3931 self.assertEqual(hanat.sequence_number, 3)
3932 self.assertEqual(hanat.flags, "ACK")
3933 self.assertEqual(hanat.version, 1)
3934 users = self.vapi.nat44_ei_user_dump()
3935 self.assertEqual(len(users), 1)
3936 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3937 sessions = self.vapi.nat44_ei_user_session_dump(
3938 users[0].ip_address, users[0].vrf_id
3940 self.assertEqual(len(sessions), 1)
3941 session = sessions[0]
3942 self.assertEqual(session.total_bytes, 1024)
3943 self.assertEqual(session.total_pkts, 2)
3944 stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3945 self.assertEqual(stats[:, 0].sum(), 1)
3947 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3948 self.assertEqual(stats, 3)
3950 # send packet to test session created by HA
3952 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3953 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3954 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3956 self.pg1.add_stream(p)
3957 self.pg_enable_capture(self.pg_interfaces)
3959 capture = self.pg0.get_capture(1)
3965 self.logger.error(ppp("Invalid packet:", p))
3968 self.assertEqual(ip.src, self.pg1.remote_ip4)
3969 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3970 self.assertEqual(tcp.sport, self.tcp_external_port)
3971 self.assertEqual(tcp.dport, self.tcp_port_in)
3973 def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3974 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3975 self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3976 # keep plugin configuration persistent
3977 self.plugin_enable()
3978 return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3980 def test_set_frame_queue_nelts(self):
3981 """NAT44EI API test - worker handoff frame queue elements"""
3982 self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3984 def show_commands_at_teardown(self):
3985 self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3986 self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3987 self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3988 self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3989 self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3990 self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3991 self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3992 self.logger.info(self.vapi.cli("show nat44 ei ha"))
3993 self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3995 def test_outside_address_distribution(self):
3996 """Outside address distribution based on source address"""
4001 for i in range(1, x):
4003 nat_addresses.append(a)
4005 flags = self.config_flags.NAT44_EI_IF_INSIDE
4006 self.vapi.nat44_ei_interface_add_del_feature(
4007 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4009 self.vapi.nat44_ei_interface_add_del_feature(
4010 sw_if_index=self.pg1.sw_if_index, is_add=1
4013 self.vapi.nat44_ei_add_del_address_range(
4014 first_ip_address=nat_addresses[0],
4015 last_ip_address=nat_addresses[-1],
4020 self.pg0.generate_remote_hosts(x)
4024 info = self.create_packet_info(self.pg0, self.pg1)
4025 payload = self.info_to_payload(info)
4027 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4028 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4029 / UDP(sport=7000 + i, dport=8000 + i)
4035 self.pg0.add_stream(pkts)
4036 self.pg_enable_capture(self.pg_interfaces)
4038 recvd = self.pg1.get_capture(len(pkts))
4039 for p_recvd in recvd:
4040 payload_info = self.payload_to_info(p_recvd[Raw])
4041 packet_index = payload_info.index
4042 info = self._packet_infos[packet_index]
4043 self.assertTrue(info is not None)
4044 self.assertEqual(packet_index, info.index)
4046 packed = socket.inet_aton(p_sent[IP].src)
4047 numeric = struct.unpack("!L", packed)[0]
4048 numeric = socket.htonl(numeric)
4049 a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4053 "Invalid packet (src IP %s translated to %s, but expected %s)"
4054 % (p_sent[IP].src, p_recvd[IP].src, a),
4057 def test_default_user_sessions(self):
4058 """NAT44EI default per-user session limit is used and reported"""
4059 nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4060 # a nonzero default should be reported for user_sessions
4061 self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4063 def test_delete_interface(self):
4064 """NAT44EI delete nat interface"""
4066 self.nat44_add_address(self.nat_addr)
4068 interfaces = self.create_loopback_interfaces(4)
4070 self.vapi.nat44_ei_interface_add_del_feature(
4071 sw_if_index=interfaces[0].sw_if_index, is_add=1
4073 flags = self.config_flags.NAT44_EI_IF_INSIDE
4074 self.vapi.nat44_ei_interface_add_del_feature(
4075 sw_if_index=interfaces[1].sw_if_index, flags=flags, is_add=1
4077 flags |= self.config_flags.NAT44_EI_IF_OUTSIDE
4078 self.vapi.nat44_ei_interface_add_del_feature(
4079 sw_if_index=interfaces[2].sw_if_index, flags=flags, is_add=1
4081 self.vapi.nat44_ei_add_del_output_interface(
4082 sw_if_index=interfaces[3].sw_if_index, is_add=1
4085 nat_sw_if_indices = [
4087 for i in self.vapi.nat44_ei_interface_dump()
4088 + list(self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get))
4090 self.assertEqual(len(nat_sw_if_indices), len(interfaces))
4093 for i in interfaces:
4094 # delete nat-enabled interface
4095 self.assertIn(i.sw_if_index, nat_sw_if_indices)
4096 i.remove_vpp_config()
4098 # create interface with the same index
4099 lo = VppLoInterface(self)
4100 loopbacks.append(lo)
4101 self.assertEqual(lo.sw_if_index, i.sw_if_index)
4103 # check interface is not nat-enabled
4104 nat_sw_if_indices = [
4106 for i in self.vapi.nat44_ei_interface_dump()
4108 self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get)
4111 self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
4114 i.remove_vpp_config()
4117 class TestNAT44Out2InDPO(MethodHolder):
4118 """NAT44EI Test Cases using out2in DPO"""
4121 def setUpClass(cls):
4122 super(TestNAT44Out2InDPO, cls).setUpClass()
4123 cls.vapi.cli("set log class nat44-ei level debug")
4125 cls.tcp_port_in = 6303
4126 cls.tcp_port_out = 6303
4127 cls.udp_port_in = 6304
4128 cls.udp_port_out = 6304
4129 cls.icmp_id_in = 6305
4130 cls.icmp_id_out = 6305
4131 cls.nat_addr = "10.0.0.3"
4132 cls.dst_ip4 = "192.168.70.1"
4134 cls.create_pg_interfaces(range(2))
4137 cls.pg0.config_ip4()
4138 cls.pg0.resolve_arp()
4141 cls.pg1.config_ip6()
4142 cls.pg1.resolve_ndp()
4148 [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4154 super(TestNAT44Out2InDPO, self).setUp()
4155 flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4156 self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4159 super(TestNAT44Out2InDPO, self).tearDown()
4160 if not self.vpp_dead:
4161 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4162 self.vapi.cli("clear logging")
4164 def configure_xlat(self):
4165 self.dst_ip6_pfx = "1:2:3::"
4166 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4167 self.dst_ip6_pfx_len = 96
4168 self.src_ip6_pfx = "4:5:6::"
4169 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4170 self.src_ip6_pfx_len = 96
4171 self.vapi.map_add_domain(
4173 self.dst_ip6_pfx_len,
4175 self.src_ip6_pfx_len,
4180 @unittest.skip("Temporary disabled")
4181 def test_464xlat_ce(self):
4182 """Test 464XLAT CE with NAT44EI"""
4184 self.configure_xlat()
4186 flags = self.config_flags.NAT44_EI_IF_INSIDE
4187 self.vapi.nat44_ei_interface_add_del_feature(
4188 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4190 self.vapi.nat44_ei_add_del_address_range(
4191 first_ip_address=self.nat_addr_n,
4192 last_ip_address=self.nat_addr_n,
4197 out_src_ip6 = self.compose_ip6(
4198 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4200 out_dst_ip6 = self.compose_ip6(
4201 self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4205 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4206 self.pg0.add_stream(pkts)
4207 self.pg_enable_capture(self.pg_interfaces)
4209 capture = self.pg1.get_capture(len(pkts))
4210 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4212 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4213 self.pg1.add_stream(pkts)
4214 self.pg_enable_capture(self.pg_interfaces)
4216 capture = self.pg0.get_capture(len(pkts))
4217 self.verify_capture_in(capture, self.pg0)
4219 self.vapi.nat44_ei_interface_add_del_feature(
4220 sw_if_index=self.pg0.sw_if_index, flags=flags
4222 self.vapi.nat44_ei_add_del_address_range(
4223 first_ip_address=self.nat_addr_n,
4224 last_ip_address=self.nat_addr_n,
4228 @unittest.skip("Temporary disabled")
4229 def test_464xlat_ce_no_nat(self):
4230 """Test 464XLAT CE without NAT44EI"""
4232 self.configure_xlat()
4234 out_src_ip6 = self.compose_ip6(
4235 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4237 out_dst_ip6 = self.compose_ip6(
4238 self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4241 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4242 self.pg0.add_stream(pkts)
4243 self.pg_enable_capture(self.pg_interfaces)
4245 capture = self.pg1.get_capture(len(pkts))
4246 self.verify_capture_out_ip6(
4247 capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4250 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4251 self.pg1.add_stream(pkts)
4252 self.pg_enable_capture(self.pg_interfaces)
4254 capture = self.pg0.get_capture(len(pkts))
4255 self.verify_capture_in(capture, self.pg0)
4258 class TestNAT44EIMW(MethodHolder):
4259 """NAT44EI Test Cases (multiple workers)"""
4261 vpp_worker_count = 2
4262 max_translations = 10240
4266 def setUpClass(cls):
4267 super(TestNAT44EIMW, cls).setUpClass()
4268 cls.vapi.cli("set log class nat level debug")
4270 cls.tcp_port_in = 6303
4271 cls.tcp_port_out = 6303
4272 cls.udp_port_in = 6304
4273 cls.udp_port_out = 6304
4274 cls.icmp_id_in = 6305
4275 cls.icmp_id_out = 6305
4276 cls.nat_addr = "10.0.0.3"
4277 cls.ipfix_src_port = 4739
4278 cls.ipfix_domain_id = 1
4279 cls.tcp_external_port = 80
4280 cls.udp_external_port = 69
4282 cls.create_pg_interfaces(range(10))
4283 cls.interfaces = list(cls.pg_interfaces[0:4])
4285 for i in cls.interfaces:
4290 cls.pg0.generate_remote_hosts(3)
4291 cls.pg0.configure_ipv4_neighbors()
4293 cls.pg1.generate_remote_hosts(1)
4294 cls.pg1.configure_ipv4_neighbors()
4296 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4297 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4298 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4300 cls.pg4._local_ip4 = "172.16.255.1"
4301 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4302 cls.pg4.set_table_ip4(10)
4303 cls.pg5._local_ip4 = "172.17.255.3"
4304 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4305 cls.pg5.set_table_ip4(10)
4306 cls.pg6._local_ip4 = "172.16.255.1"
4307 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4308 cls.pg6.set_table_ip4(20)
4309 for i in cls.overlapping_interfaces:
4317 cls.pg9.generate_remote_hosts(2)
4318 cls.pg9.config_ip4()
4319 cls.vapi.sw_interface_add_del_address(
4320 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4324 cls.pg9.resolve_arp()
4325 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4326 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4327 cls.pg9.resolve_arp()
4330 super(TestNAT44EIMW, self).setUp()
4331 self.vapi.nat44_ei_plugin_enable_disable(
4332 sessions=self.max_translations, users=self.max_users, enable=1
4336 super(TestNAT44EIMW, self).tearDown()
4337 if not self.vpp_dead:
4338 self.vapi.nat44_ei_ipfix_enable_disable(
4339 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4341 self.ipfix_src_port = 4739
4342 self.ipfix_domain_id = 1
4344 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4345 self.vapi.cli("clear logging")
4347 def test_hairpinning(self):
4348 """NAT44EI hairpinning - 1:1 NAPT"""
4350 host = self.pg0.remote_hosts[0]
4351 server = self.pg0.remote_hosts[1]
4354 server_in_port = 5678
4355 server_out_port = 8765
4359 self.nat44_add_address(self.nat_addr)
4360 flags = self.config_flags.NAT44_EI_IF_INSIDE
4361 self.vapi.nat44_ei_interface_add_del_feature(
4362 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4364 self.vapi.nat44_ei_interface_add_del_feature(
4365 sw_if_index=self.pg1.sw_if_index, is_add=1
4368 # add static mapping for server
4369 self.nat44_add_static_mapping(
4374 proto=IP_PROTOS.tcp,
4377 cnt = self.statistics["/nat44-ei/hairpinning"]
4378 # send packet from host to server
4380 Ether(src=host.mac, dst=self.pg0.local_mac)
4381 / IP(src=host.ip4, dst=self.nat_addr)
4382 / TCP(sport=host_in_port, dport=server_out_port)
4384 self.pg0.add_stream(p)
4385 self.pg_enable_capture(self.pg_interfaces)
4387 capture = self.pg0.get_capture(1)
4392 self.assertEqual(ip.src, self.nat_addr)
4393 self.assertEqual(ip.dst, server.ip4)
4394 self.assertNotEqual(tcp.sport, host_in_port)
4395 self.assertEqual(tcp.dport, server_in_port)
4396 self.assert_packet_checksums_valid(p)
4397 host_out_port = tcp.sport
4399 self.logger.error(ppp("Unexpected or invalid packet:", p))
4402 after = self.statistics["/nat44-ei/hairpinning"]
4404 if_idx = self.pg0.sw_if_index
4405 self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4407 # send reply from server to host
4409 Ether(src=server.mac, dst=self.pg0.local_mac)
4410 / IP(src=server.ip4, dst=self.nat_addr)
4411 / TCP(sport=server_in_port, dport=host_out_port)
4413 self.pg0.add_stream(p)
4414 self.pg_enable_capture(self.pg_interfaces)
4416 capture = self.pg0.get_capture(1)
4421 self.assertEqual(ip.src, self.nat_addr)
4422 self.assertEqual(ip.dst, host.ip4)
4423 self.assertEqual(tcp.sport, server_out_port)
4424 self.assertEqual(tcp.dport, host_in_port)
4425 self.assert_packet_checksums_valid(p)
4427 self.logger.error(ppp("Unexpected or invalid packet:", p))
4430 after = self.statistics["/nat44-ei/hairpinning"]
4431 if_idx = self.pg0.sw_if_index
4432 self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4433 self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4435 def test_hairpinning2(self):
4436 """NAT44EI hairpinning - 1:1 NAT"""
4438 server1_nat_ip = "10.0.0.10"
4439 server2_nat_ip = "10.0.0.11"
4440 host = self.pg0.remote_hosts[0]
4441 server1 = self.pg0.remote_hosts[1]
4442 server2 = self.pg0.remote_hosts[2]
4443 server_tcp_port = 22
4444 server_udp_port = 20
4446 self.nat44_add_address(self.nat_addr)
4447 flags = self.config_flags.NAT44_EI_IF_INSIDE
4448 self.vapi.nat44_ei_interface_add_del_feature(
4449 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4451 self.vapi.nat44_ei_interface_add_del_feature(
4452 sw_if_index=self.pg1.sw_if_index, is_add=1
4455 # add static mapping for servers
4456 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4457 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4462 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4463 / IP(src=host.ip4, dst=server1_nat_ip)
4464 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4468 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4469 / IP(src=host.ip4, dst=server1_nat_ip)
4470 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4474 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4475 / IP(src=host.ip4, dst=server1_nat_ip)
4476 / ICMP(id=self.icmp_id_in, type="echo-request")
4479 self.pg0.add_stream(pkts)
4480 self.pg_enable_capture(self.pg_interfaces)
4482 capture = self.pg0.get_capture(len(pkts))
4483 for packet in capture:
4485 self.assertEqual(packet[IP].src, self.nat_addr)
4486 self.assertEqual(packet[IP].dst, server1.ip4)
4487 if packet.haslayer(TCP):
4488 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4489 self.assertEqual(packet[TCP].dport, server_tcp_port)
4490 self.tcp_port_out = packet[TCP].sport
4491 self.assert_packet_checksums_valid(packet)
4492 elif packet.haslayer(UDP):
4493 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4494 self.assertEqual(packet[UDP].dport, server_udp_port)
4495 self.udp_port_out = packet[UDP].sport
4497 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4498 self.icmp_id_out = packet[ICMP].id
4500 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4506 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4507 / IP(src=server1.ip4, dst=self.nat_addr)
4508 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4512 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4513 / IP(src=server1.ip4, dst=self.nat_addr)
4514 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4518 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4519 / IP(src=server1.ip4, dst=self.nat_addr)
4520 / ICMP(id=self.icmp_id_out, type="echo-reply")
4523 self.pg0.add_stream(pkts)
4524 self.pg_enable_capture(self.pg_interfaces)
4526 capture = self.pg0.get_capture(len(pkts))
4527 for packet in capture:
4529 self.assertEqual(packet[IP].src, server1_nat_ip)
4530 self.assertEqual(packet[IP].dst, host.ip4)
4531 if packet.haslayer(TCP):
4532 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4533 self.assertEqual(packet[TCP].sport, server_tcp_port)
4534 self.assert_packet_checksums_valid(packet)
4535 elif packet.haslayer(UDP):
4536 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4537 self.assertEqual(packet[UDP].sport, server_udp_port)
4539 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4541 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4544 # server2 to server1
4547 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4548 / IP(src=server2.ip4, dst=server1_nat_ip)
4549 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4553 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4554 / IP(src=server2.ip4, dst=server1_nat_ip)
4555 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4559 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4560 / IP(src=server2.ip4, dst=server1_nat_ip)
4561 / ICMP(id=self.icmp_id_in, type="echo-request")
4564 self.pg0.add_stream(pkts)
4565 self.pg_enable_capture(self.pg_interfaces)
4567 capture = self.pg0.get_capture(len(pkts))
4568 for packet in capture:
4570 self.assertEqual(packet[IP].src, server2_nat_ip)
4571 self.assertEqual(packet[IP].dst, server1.ip4)
4572 if packet.haslayer(TCP):
4573 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4574 self.assertEqual(packet[TCP].dport, server_tcp_port)
4575 self.tcp_port_out = packet[TCP].sport
4576 self.assert_packet_checksums_valid(packet)
4577 elif packet.haslayer(UDP):
4578 self.assertEqual(packet[UDP].sport, self.udp_port_in)
4579 self.assertEqual(packet[UDP].dport, server_udp_port)
4580 self.udp_port_out = packet[UDP].sport
4582 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4583 self.icmp_id_out = packet[ICMP].id
4585 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4588 # server1 to server2
4591 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4592 / IP(src=server1.ip4, dst=server2_nat_ip)
4593 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4597 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4598 / IP(src=server1.ip4, dst=server2_nat_ip)
4599 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4603 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4604 / IP(src=server1.ip4, dst=server2_nat_ip)
4605 / ICMP(id=self.icmp_id_out, type="echo-reply")
4608 self.pg0.add_stream(pkts)
4609 self.pg_enable_capture(self.pg_interfaces)
4611 capture = self.pg0.get_capture(len(pkts))
4612 for packet in capture:
4614 self.assertEqual(packet[IP].src, server1_nat_ip)
4615 self.assertEqual(packet[IP].dst, server2.ip4)
4616 if packet.haslayer(TCP):
4617 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4618 self.assertEqual(packet[TCP].sport, server_tcp_port)
4619 self.assert_packet_checksums_valid(packet)
4620 elif packet.haslayer(UDP):
4621 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4622 self.assertEqual(packet[UDP].sport, server_udp_port)
4624 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4626 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4630 if __name__ == "__main__":
4631 unittest.main(testRunner=VppTestRunner)