11 from framework import tag_fixme_debian11, is_distro_debian11
12 from framework import VppTestCase, VppTestRunner, VppLoInterface
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
41 # NAT HA protocol event data
45 ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46 ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47 ShortField("flags", 0),
48 IPField("in_addr", None),
49 IPField("out_addr", None),
50 ShortField("in_port", None),
51 ShortField("out_port", None),
52 IPField("eh_addr", None),
53 IPField("ehn_addr", None),
54 ShortField("eh_port", None),
55 ShortField("ehn_port", None),
56 IntField("fib_index", None),
57 IntField("total_pkts", 0),
58 LongField("total_bytes", 0),
61 def extract_padding(self, s):
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67 name = "HA NAT state sync"
69 XByteField("version", 1),
70 FlagsField("flags", 0, 8, ["ACK"]),
71 FieldLenField("count", None, count_of="events"),
72 IntField("sequence_number", 1),
73 IntField("thread_index", 0),
74 PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
78 class MethodHolder(VppTestCase):
79 """NAT create capture and verify method holder"""
82 def config_flags(self):
83 return VppEnum.vl_api_nat44_ei_config_flags_t
86 def SYSLOG_SEVERITY(self):
87 return VppEnum.vl_api_syslog_severity_t
89 def nat44_add_static_mapping(
92 external_ip="0.0.0.0",
97 external_sw_if_index=0xFFFFFFFF,
103 Add/delete NAT44EI static mapping
105 :param local_ip: Local IP address
106 :param external_ip: External IP address
107 :param local_port: Local port number (Optional)
108 :param external_port: External port number (Optional)
109 :param vrf_id: VRF ID (Default 0)
110 :param is_add: 1 if add, 0 if delete (Default add)
111 :param external_sw_if_index: External interface instead of IP address
112 :param proto: IP protocol (Mandatory if port specified)
113 :param tag: Opaque string tag
114 :param flags: NAT configuration flags
117 if not (local_port and external_port):
118 flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
120 self.vapi.nat44_ei_add_del_static_mapping(
122 local_ip_address=local_ip,
123 external_ip_address=external_ip,
124 external_sw_if_index=external_sw_if_index,
125 local_port=local_port,
126 external_port=external_port,
133 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
135 Add/delete NAT44EI address
137 :param ip: IP address
138 :param is_add: 1 if add, 0 if delete (Default add)
140 self.vapi.nat44_ei_add_del_address_range(
141 first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
144 def create_routes_and_neigbors(self):
149 [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
155 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
162 self.pg7.sw_if_index,
169 self.pg8.sw_if_index,
177 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
179 Create packet stream for inside network
181 :param in_if: Inside interface
182 :param out_if: Outside interface
183 :param dst_ip: Destination address
184 :param ttl: TTL of generated packets
187 dst_ip = out_if.remote_ip4
192 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194 / TCP(sport=self.tcp_port_in, dport=20)
200 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202 / UDP(sport=self.udp_port_in, dport=20)
208 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210 / ICMP(id=self.icmp_id_in, type="echo-request")
216 def compose_ip6(self, ip4, pref, plen):
218 Compose IPv4-embedded IPv6 addresses
220 :param ip4: IPv4 address
221 :param pref: IPv6 prefix
222 :param plen: IPv6 prefix length
223 :returns: IPv4-embedded IPv6 addresses
225 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
241 pref_n[10] = ip4_n[3]
245 pref_n[10] = ip4_n[2]
246 pref_n[11] = ip4_n[3]
249 pref_n[10] = ip4_n[1]
250 pref_n[11] = ip4_n[2]
251 pref_n[12] = ip4_n[3]
253 pref_n[12] = ip4_n[0]
254 pref_n[13] = ip4_n[1]
255 pref_n[14] = ip4_n[2]
256 pref_n[15] = ip4_n[3]
257 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
260 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
262 Create packet stream for outside network
264 :param out_if: Outside interface
265 :param dst_ip: Destination IP address (Default use global NAT address)
266 :param ttl: TTL of generated packets
267 :param use_inside_ports: Use inside NAT ports as destination ports
268 instead of outside ports
271 dst_ip = self.nat_addr
272 if not use_inside_ports:
273 tcp_port = self.tcp_port_out
274 udp_port = self.udp_port_out
275 icmp_id = self.icmp_id_out
277 tcp_port = self.tcp_port_in
278 udp_port = self.udp_port_in
279 icmp_id = self.icmp_id_in
283 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285 / TCP(dport=tcp_port, sport=20)
291 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293 / UDP(dport=udp_port, sport=20)
299 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301 / ICMP(id=icmp_id, type="echo-reply")
307 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
309 Create packet stream for outside network
311 :param out_if: Outside interface
312 :param dst_ip: Destination IP address (Default use global NAT address)
313 :param hl: HL of generated packets
318 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320 / TCP(dport=self.tcp_port_out, sport=20)
326 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328 / UDP(dport=self.udp_port_out, sport=20)
334 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336 / ICMPv6EchoReply(id=self.icmp_id_out)
342 def verify_capture_out(
352 Verify captured packets on outside network
354 :param capture: Captured packets
355 :param nat_ip: Translated IP address (Default use global NAT address)
356 :param same_port: Source port number is not translated (Default False)
357 :param dst_ip: Destination IP address (Default do not verify)
358 :param is_ip6: If L3 protocol is IPv6 (Default False)
362 ICMP46 = ICMPv6EchoRequest
367 nat_ip = self.nat_addr
368 for packet in capture:
371 self.assert_packet_checksums_valid(packet)
372 self.assertEqual(packet[IP46].src, nat_ip)
373 if dst_ip is not None:
374 self.assertEqual(packet[IP46].dst, dst_ip)
375 if packet.haslayer(TCP):
378 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
380 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381 self.tcp_port_out = packet[TCP].sport
382 self.assert_packet_checksums_valid(packet)
383 elif packet.haslayer(UDP):
386 self.assertEqual(packet[UDP].sport, self.udp_port_in)
388 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389 self.udp_port_out = packet[UDP].sport
393 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
395 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396 self.icmp_id_out = packet[ICMP46].id
397 self.assert_packet_checksums_valid(packet)
400 ppp("Unexpected or invalid packet (outside network):", packet)
404 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
406 Verify captured packets on outside network
408 :param capture: Captured packets
409 :param nat_ip: Translated IP address
410 :param same_port: Source port number is not translated (Default False)
411 :param dst_ip: Destination IP address (Default do not verify)
413 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
415 def verify_capture_in(self, capture, in_if):
417 Verify captured packets on inside network
419 :param capture: Captured packets
420 :param in_if: Inside interface
422 for packet in capture:
424 self.assert_packet_checksums_valid(packet)
425 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426 if packet.haslayer(TCP):
427 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428 elif packet.haslayer(UDP):
429 self.assertEqual(packet[UDP].dport, self.udp_port_in)
431 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
434 ppp("Unexpected or invalid packet (inside network):", packet)
438 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
440 Verify captured packet that don't have to be translated
442 :param capture: Captured packets
443 :param ingress_if: Ingress interface
444 :param egress_if: Egress interface
446 for packet in capture:
448 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452 elif packet.haslayer(UDP):
453 self.assertEqual(packet[UDP].sport, self.udp_port_in)
455 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
458 ppp("Unexpected or invalid packet (inside network):", packet)
462 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
464 Verify captured packets with ICMP errors on outside network
466 :param capture: Captured packets
467 :param src_ip: Translated IP address or IP address of VPP
468 (Default use global NAT address)
469 :param icmp_type: Type of error ICMP packet
470 we are expecting (Default 11)
473 src_ip = self.nat_addr
474 for packet in capture:
476 self.assertEqual(packet[IP].src, src_ip)
477 self.assertEqual(packet.haslayer(ICMP), 1)
479 self.assertEqual(icmp.type, icmp_type)
480 self.assertTrue(icmp.haslayer(IPerror))
481 inner_ip = icmp[IPerror]
482 if inner_ip.haslayer(TCPerror):
483 self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484 elif inner_ip.haslayer(UDPerror):
485 self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
487 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
490 ppp("Unexpected or invalid packet (outside network):", packet)
494 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
496 Verify captured packets with ICMP errors on inside network
498 :param capture: Captured packets
499 :param in_if: Inside interface
500 :param icmp_type: Type of error ICMP packet
501 we are expecting (Default 11)
503 for packet in capture:
505 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506 self.assertEqual(packet.haslayer(ICMP), 1)
508 self.assertEqual(icmp.type, icmp_type)
509 self.assertTrue(icmp.haslayer(IPerror))
510 inner_ip = icmp[IPerror]
511 if inner_ip.haslayer(TCPerror):
512 self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513 elif inner_ip.haslayer(UDPerror):
514 self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
516 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
519 ppp("Unexpected or invalid packet (inside network):", packet)
523 def create_stream_frag(
524 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
527 Create fragmented packet stream
529 :param src_if: Source interface
530 :param dst: Destination IPv4 address
531 :param sport: Source port
532 :param dport: Destination port
533 :param data: Payload data
534 :param proto: protocol (TCP, UDP, ICMP)
535 :param echo_reply: use echo_reply if protocol is ICMP
538 if proto == IP_PROTOS.tcp:
540 IP(src=src_if.remote_ip4, dst=dst)
541 / TCP(sport=sport, dport=dport)
544 p = p.__class__(scapy.compat.raw(p))
545 chksum = p[TCP].chksum
546 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547 elif proto == IP_PROTOS.udp:
548 proto_header = UDP(sport=sport, dport=dport)
549 elif proto == IP_PROTOS.icmp:
551 proto_header = ICMP(id=sport, type="echo-request")
553 proto_header = ICMP(id=sport, type="echo-reply")
555 raise Exception("Unsupported protocol")
556 id = random.randint(0, 65535)
558 if proto == IP_PROTOS.tcp:
561 raw = Raw(data[0:16])
563 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
569 if proto == IP_PROTOS.tcp:
570 raw = Raw(data[4:20])
572 raw = Raw(data[16:32])
574 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
579 if proto == IP_PROTOS.tcp:
584 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
591 def reass_frags_and_verify(self, frags, src, dst):
593 Reassemble and verify fragmented packet
595 :param frags: Captured fragments
596 :param src: Source IPv4 address to verify
597 :param dst: Destination IPv4 address to verify
599 :returns: Reassembled IPv4 packet
603 self.assertEqual(p[IP].src, src)
604 self.assertEqual(p[IP].dst, dst)
605 self.assert_ip_checksum_valid(p)
606 buffer.seek(p[IP].frag * 8)
607 buffer.write(bytes(p[IP].payload))
608 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609 if ip.proto == IP_PROTOS.tcp:
610 p = ip / TCP(buffer.getvalue())
611 self.logger.debug(ppp("Reassembled:", p))
612 self.assert_tcp_checksum_valid(p)
613 elif ip.proto == IP_PROTOS.udp:
614 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615 elif ip.proto == IP_PROTOS.icmp:
616 p = ip / ICMP(buffer.getvalue())
619 def verify_ipfix_nat44_ses(self, data):
621 Verify IPFIX NAT44EI session create/delete event
623 :param data: Decoded IPFIX data records
625 nat44_ses_create_num = 0
626 nat44_ses_delete_num = 0
627 self.assertEqual(6, len(data))
630 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631 if scapy.compat.orb(record[230]) == 4:
632 nat44_ses_create_num += 1
634 nat44_ses_delete_num += 1
636 self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637 # postNATSourceIPv4Address
639 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
642 self.assertEqual(struct.pack("!I", 0), record[234])
643 # protocolIdentifier/sourceTransportPort
644 # /postNAPTSourceTransportPort
645 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
655 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656 self.assertEqual(3, nat44_ses_create_num)
657 self.assertEqual(3, nat44_ses_delete_num)
659 def verify_ipfix_addr_exhausted(self, data):
660 self.assertEqual(1, len(data))
663 self.assertEqual(scapy.compat.orb(record[230]), 3)
665 self.assertEqual(struct.pack("!I", 0), record[283])
667 def verify_ipfix_max_sessions(self, data, limit):
668 self.assertEqual(1, len(data))
671 self.assertEqual(scapy.compat.orb(record[230]), 13)
672 # natQuotaExceededEvent
673 self.assertEqual(struct.pack("!I", 1), record[466])
675 self.assertEqual(struct.pack("!I", limit), record[471])
677 def verify_no_nat44_user(self):
678 """Verify that there is no NAT44EI user"""
679 users = self.vapi.nat44_ei_user_dump()
680 self.assertEqual(len(users), 0)
681 users = self.statistics["/nat44-ei/total-users"]
682 self.assertEqual(users[0][0], 0)
683 sessions = self.statistics["/nat44-ei/total-sessions"]
684 self.assertEqual(sessions[0][0], 0)
686 def verify_syslog_apmap(self, data, is_add=True):
687 message = data.decode("utf-8")
689 message = SyslogMessage.parse(message)
690 except ParseError as e:
694 self.assertEqual(message.severity, SyslogSeverity.info)
695 self.assertEqual(message.appname, "NAT")
696 self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
697 sd_params = message.sd.get("napmap")
698 self.assertTrue(sd_params is not None)
699 self.assertEqual(sd_params.get("IATYP"), "IPv4")
700 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
701 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
702 self.assertEqual(sd_params.get("XATYP"), "IPv4")
703 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
704 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
705 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
706 self.assertTrue(sd_params.get("SSUBIX") is not None)
707 self.assertEqual(sd_params.get("SVLAN"), "0")
709 def verify_mss_value(self, pkt, mss):
710 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
711 raise TypeError("Not a TCP/IP packet")
713 for option in pkt[TCP].options:
714 if option[0] == "MSS":
715 self.assertEqual(option[1], mss)
716 self.assert_tcp_checksum_valid(pkt)
719 def proto2layer(proto):
720 if proto == IP_PROTOS.tcp:
722 elif proto == IP_PROTOS.udp:
724 elif proto == IP_PROTOS.icmp:
727 raise Exception("Unsupported protocol")
730 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
732 layer = self.proto2layer(proto)
734 if proto == IP_PROTOS.tcp:
735 data = b"A" * 4 + b"B" * 16 + b"C" * 3
737 data = b"A" * 16 + b"B" * 16 + b"C" * 3
738 self.port_in = random.randint(1025, 65535)
741 pkts = self.create_stream_frag(
742 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
744 self.pg0.add_stream(pkts)
745 self.pg_enable_capture(self.pg_interfaces)
747 frags = self.pg1.get_capture(len(pkts))
748 if not dont_translate:
749 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
751 p = self.reass_frags_and_verify(
752 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
754 if proto != IP_PROTOS.icmp:
755 if not dont_translate:
756 self.assertEqual(p[layer].dport, 20)
758 self.assertNotEqual(p[layer].sport, self.port_in)
760 self.assertEqual(p[layer].sport, self.port_in)
763 if not dont_translate:
764 self.assertNotEqual(p[layer].id, self.port_in)
766 self.assertEqual(p[layer].id, self.port_in)
767 self.assertEqual(data, p[Raw].load)
770 if not dont_translate:
771 dst_addr = self.nat_addr
773 dst_addr = self.pg0.remote_ip4
774 if proto != IP_PROTOS.icmp:
776 dport = p[layer].sport
780 pkts = self.create_stream_frag(
781 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
783 self.pg1.add_stream(pkts)
784 self.pg_enable_capture(self.pg_interfaces)
786 frags = self.pg0.get_capture(len(pkts))
787 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
788 if proto != IP_PROTOS.icmp:
789 self.assertEqual(p[layer].sport, 20)
790 self.assertEqual(p[layer].dport, self.port_in)
792 self.assertEqual(p[layer].id, self.port_in)
793 self.assertEqual(data, p[Raw].load)
795 def reass_hairpinning(
805 layer = self.proto2layer(proto)
807 if proto == IP_PROTOS.tcp:
808 data = b"A" * 4 + b"B" * 16 + b"C" * 3
810 data = b"A" * 16 + b"B" * 16 + b"C" * 3
812 # send packet from host to server
813 pkts = self.create_stream_frag(
814 self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
816 self.pg0.add_stream(pkts)
817 self.pg_enable_capture(self.pg_interfaces)
819 frags = self.pg0.get_capture(len(pkts))
820 p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
821 if proto != IP_PROTOS.icmp:
823 self.assertNotEqual(p[layer].sport, host_in_port)
824 self.assertEqual(p[layer].dport, server_in_port)
827 self.assertNotEqual(p[layer].id, host_in_port)
828 self.assertEqual(data, p[Raw].load)
830 def frag_out_of_order(
831 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
833 layer = self.proto2layer(proto)
835 if proto == IP_PROTOS.tcp:
836 data = b"A" * 4 + b"B" * 16 + b"C" * 3
838 data = b"A" * 16 + b"B" * 16 + b"C" * 3
839 self.port_in = random.randint(1025, 65535)
843 pkts = self.create_stream_frag(
844 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
847 self.pg0.add_stream(pkts)
848 self.pg_enable_capture(self.pg_interfaces)
850 frags = self.pg1.get_capture(len(pkts))
851 if not dont_translate:
852 p = self.reass_frags_and_verify(
853 frags, self.nat_addr, self.pg1.remote_ip4
856 p = self.reass_frags_and_verify(
857 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
859 if proto != IP_PROTOS.icmp:
860 if not dont_translate:
861 self.assertEqual(p[layer].dport, 20)
863 self.assertNotEqual(p[layer].sport, self.port_in)
865 self.assertEqual(p[layer].sport, self.port_in)
868 if not dont_translate:
869 self.assertNotEqual(p[layer].id, self.port_in)
871 self.assertEqual(p[layer].id, self.port_in)
872 self.assertEqual(data, p[Raw].load)
875 if not dont_translate:
876 dst_addr = self.nat_addr
878 dst_addr = self.pg0.remote_ip4
879 if proto != IP_PROTOS.icmp:
881 dport = p[layer].sport
885 pkts = self.create_stream_frag(
886 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
889 self.pg1.add_stream(pkts)
890 self.pg_enable_capture(self.pg_interfaces)
892 frags = self.pg0.get_capture(len(pkts))
893 p = self.reass_frags_and_verify(
894 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
896 if proto != IP_PROTOS.icmp:
897 self.assertEqual(p[layer].sport, 20)
898 self.assertEqual(p[layer].dport, self.port_in)
900 self.assertEqual(p[layer].id, self.port_in)
901 self.assertEqual(data, p[Raw].load)
904 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
905 if 0 == vpp_worker_count:
907 numeric = socket.inet_aton(ip)
908 numeric = struct.unpack("!L", numeric)[0]
909 numeric = socket.htonl(numeric)
910 h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
911 return 1 + h % vpp_worker_count
915 class TestNAT44EI(MethodHolder):
916 """NAT44EI Test Cases"""
918 max_translations = 10240
923 super(TestNAT44EI, cls).setUpClass()
924 if is_distro_debian11 == True and not hasattr(cls, "vpp"):
926 cls.vapi.cli("set log class nat44-ei level debug")
928 cls.tcp_port_in = 6303
929 cls.tcp_port_out = 6303
930 cls.udp_port_in = 6304
931 cls.udp_port_out = 6304
932 cls.icmp_id_in = 6305
933 cls.icmp_id_out = 6305
934 cls.nat_addr = "10.0.0.3"
935 cls.ipfix_src_port = 4739
936 cls.ipfix_domain_id = 1
937 cls.tcp_external_port = 80
938 cls.udp_external_port = 69
940 cls.create_pg_interfaces(range(10))
941 cls.interfaces = list(cls.pg_interfaces[0:4])
943 for i in cls.interfaces:
948 cls.pg0.generate_remote_hosts(3)
949 cls.pg0.configure_ipv4_neighbors()
951 cls.pg1.generate_remote_hosts(1)
952 cls.pg1.configure_ipv4_neighbors()
954 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
955 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
956 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
958 cls.pg4._local_ip4 = "172.16.255.1"
959 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
960 cls.pg4.set_table_ip4(10)
961 cls.pg5._local_ip4 = "172.17.255.3"
962 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
963 cls.pg5.set_table_ip4(10)
964 cls.pg6._local_ip4 = "172.16.255.1"
965 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
966 cls.pg6.set_table_ip4(20)
967 for i in cls.overlapping_interfaces:
975 cls.pg9.generate_remote_hosts(2)
977 cls.vapi.sw_interface_add_del_address(
978 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
982 cls.pg9.resolve_arp()
983 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
984 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
985 cls.pg9.resolve_arp()
987 def plugin_enable(self):
988 self.vapi.nat44_ei_plugin_enable_disable(
989 sessions=self.max_translations, users=self.max_users, enable=1
993 super(TestNAT44EI, self).setUp()
997 super(TestNAT44EI, self).tearDown()
998 if not self.vpp_dead:
999 self.vapi.nat44_ei_ipfix_enable_disable(
1000 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1006 self.vapi.cli("clear logging")
1008 def test_clear_sessions(self):
1009 """NAT44EI session clearing test"""
1011 self.nat44_add_address(self.nat_addr)
1012 flags = self.config_flags.NAT44_EI_IF_INSIDE
1013 self.vapi.nat44_ei_interface_add_del_feature(
1014 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1016 self.vapi.nat44_ei_interface_add_del_feature(
1017 sw_if_index=self.pg1.sw_if_index, is_add=1
1020 pkts = self.create_stream_in(self.pg0, self.pg1)
1021 self.pg0.add_stream(pkts)
1022 self.pg_enable_capture(self.pg_interfaces)
1024 capture = self.pg1.get_capture(len(pkts))
1025 self.verify_capture_out(capture)
1027 sessions = self.statistics["/nat44-ei/total-sessions"]
1028 self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1029 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1031 self.vapi.cli("clear nat44 ei sessions")
1033 sessions = self.statistics["/nat44-ei/total-sessions"]
1034 self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1035 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1037 def test_dynamic(self):
1038 """NAT44EI dynamic translation test"""
1039 self.nat44_add_address(self.nat_addr)
1040 flags = self.config_flags.NAT44_EI_IF_INSIDE
1041 self.vapi.nat44_ei_interface_add_del_feature(
1042 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1044 self.vapi.nat44_ei_interface_add_del_feature(
1045 sw_if_index=self.pg1.sw_if_index, is_add=1
1049 tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1050 udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1051 icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1052 drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1054 pkts = self.create_stream_in(self.pg0, self.pg1)
1055 self.pg0.add_stream(pkts)
1056 self.pg_enable_capture(self.pg_interfaces)
1058 capture = self.pg1.get_capture(len(pkts))
1059 self.verify_capture_out(capture)
1061 if_idx = self.pg0.sw_if_index
1062 cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1063 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1064 cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1065 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1066 cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1067 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1068 cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1069 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1072 tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1073 udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1074 icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1075 drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1077 pkts = self.create_stream_out(self.pg1)
1078 self.pg1.add_stream(pkts)
1079 self.pg_enable_capture(self.pg_interfaces)
1081 capture = self.pg0.get_capture(len(pkts))
1082 self.verify_capture_in(capture, self.pg0)
1084 if_idx = self.pg1.sw_if_index
1085 cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1086 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1087 cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1088 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1089 cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1090 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1091 cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1092 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1094 users = self.statistics["/nat44-ei/total-users"]
1095 self.assertEqual(users[:, 0].sum(), 1)
1096 sessions = self.statistics["/nat44-ei/total-sessions"]
1097 self.assertEqual(sessions[:, 0].sum(), 3)
1099 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1100 """NAT44EI handling of client packets with TTL=1"""
1102 self.nat44_add_address(self.nat_addr)
1103 flags = self.config_flags.NAT44_EI_IF_INSIDE
1104 self.vapi.nat44_ei_interface_add_del_feature(
1105 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1107 self.vapi.nat44_ei_interface_add_del_feature(
1108 sw_if_index=self.pg1.sw_if_index, is_add=1
1111 # Client side - generate traffic
1112 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1113 capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1115 # Client side - verify ICMP type 11 packets
1116 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1118 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1119 """NAT44EI handling of server packets with TTL=1"""
1121 self.nat44_add_address(self.nat_addr)
1122 flags = self.config_flags.NAT44_EI_IF_INSIDE
1123 self.vapi.nat44_ei_interface_add_del_feature(
1124 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1126 self.vapi.nat44_ei_interface_add_del_feature(
1127 sw_if_index=self.pg1.sw_if_index, is_add=1
1130 # Client side - create sessions
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1136 # Server side - generate traffic
1137 capture = self.pg1.get_capture(len(pkts))
1138 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1, ttl=1)
1140 capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1142 # Server side - verify ICMP type 11 packets
1143 self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1145 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1146 """NAT44EI handling of error responses to client packets with TTL=2"""
1148 self.nat44_add_address(self.nat_addr)
1149 flags = self.config_flags.NAT44_EI_IF_INSIDE
1150 self.vapi.nat44_ei_interface_add_del_feature(
1151 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1153 self.vapi.nat44_ei_interface_add_del_feature(
1154 sw_if_index=self.pg1.sw_if_index, is_add=1
1157 # Client side - generate traffic
1158 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1159 self.pg0.add_stream(pkts)
1160 self.pg_enable_capture(self.pg_interfaces)
1163 # Server side - simulate ICMP type 11 response
1164 capture = self.pg1.get_capture(len(pkts))
1166 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1167 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1170 for packet in capture
1172 self.pg1.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Client side - verify ICMP type 11 packets
1177 capture = self.pg0.get_capture(len(pkts))
1178 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1180 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1181 """NAT44EI handling of error responses to server packets with TTL=2"""
1183 self.nat44_add_address(self.nat_addr)
1184 flags = self.config_flags.NAT44_EI_IF_INSIDE
1185 self.vapi.nat44_ei_interface_add_del_feature(
1186 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1188 self.vapi.nat44_ei_interface_add_del_feature(
1189 sw_if_index=self.pg1.sw_if_index, is_add=1
1192 # Client side - create sessions
1193 pkts = self.create_stream_in(self.pg0, self.pg1)
1194 self.pg0.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1198 # Server side - generate traffic
1199 capture = self.pg1.get_capture(len(pkts))
1200 self.verify_capture_out(capture)
1201 pkts = self.create_stream_out(self.pg1, ttl=2)
1202 self.pg1.add_stream(pkts)
1203 self.pg_enable_capture(self.pg_interfaces)
1206 # Client side - simulate ICMP type 11 response
1207 capture = self.pg0.get_capture(len(pkts))
1209 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1210 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1213 for packet in capture
1215 self.pg0.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1219 # Server side - verify ICMP type 11 packets
1220 capture = self.pg1.get_capture(len(pkts))
1221 self.verify_capture_out_with_icmp_errors(capture)
1223 def test_ping_out_interface_from_outside(self):
1224 """NAT44EI ping out interface from outside network"""
1226 self.nat44_add_address(self.nat_addr)
1227 flags = self.config_flags.NAT44_EI_IF_INSIDE
1228 self.vapi.nat44_ei_interface_add_del_feature(
1229 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1231 self.vapi.nat44_ei_interface_add_del_feature(
1232 sw_if_index=self.pg1.sw_if_index, is_add=1
1236 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1237 / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1238 / ICMP(id=self.icmp_id_out, type="echo-request")
1241 self.pg1.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1244 capture = self.pg1.get_capture(len(pkts))
1247 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1248 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1249 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1250 self.assertEqual(packet[ICMP].type, 0) # echo reply
1253 ppp("Unexpected or invalid packet (outside network):", packet)
1257 def test_ping_internal_host_from_outside(self):
1258 """NAT44EI ping internal host from outside network"""
1260 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1261 flags = self.config_flags.NAT44_EI_IF_INSIDE
1262 self.vapi.nat44_ei_interface_add_del_feature(
1263 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1265 self.vapi.nat44_ei_interface_add_del_feature(
1266 sw_if_index=self.pg1.sw_if_index, is_add=1
1271 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1272 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1273 / ICMP(id=self.icmp_id_out, type="echo-request")
1275 self.pg1.add_stream(pkt)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg0.get_capture(1)
1279 self.verify_capture_in(capture, self.pg0)
1280 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1284 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1285 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1286 / ICMP(id=self.icmp_id_in, type="echo-reply")
1288 self.pg0.add_stream(pkt)
1289 self.pg_enable_capture(self.pg_interfaces)
1291 capture = self.pg1.get_capture(1)
1292 self.verify_capture_out(capture, same_port=True)
1293 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1295 def test_forwarding(self):
1296 """NAT44EI forwarding test"""
1298 flags = self.config_flags.NAT44_EI_IF_INSIDE
1299 self.vapi.nat44_ei_interface_add_del_feature(
1300 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1302 self.vapi.nat44_ei_interface_add_del_feature(
1303 sw_if_index=self.pg1.sw_if_index, is_add=1
1305 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1307 real_ip = self.pg0.remote_ip4
1308 alias_ip = self.nat_addr
1309 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1310 self.vapi.nat44_ei_add_del_static_mapping(
1312 local_ip_address=real_ip,
1313 external_ip_address=alias_ip,
1314 external_sw_if_index=0xFFFFFFFF,
1319 # static mapping match
1321 pkts = self.create_stream_out(self.pg1)
1322 self.pg1.add_stream(pkts)
1323 self.pg_enable_capture(self.pg_interfaces)
1325 capture = self.pg0.get_capture(len(pkts))
1326 self.verify_capture_in(capture, self.pg0)
1328 pkts = self.create_stream_in(self.pg0, self.pg1)
1329 self.pg0.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg1.get_capture(len(pkts))
1333 self.verify_capture_out(capture, same_port=True)
1335 # no static mapping match
1337 host0 = self.pg0.remote_hosts[0]
1338 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1340 pkts = self.create_stream_out(
1341 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1343 self.pg1.add_stream(pkts)
1344 self.pg_enable_capture(self.pg_interfaces)
1346 capture = self.pg0.get_capture(len(pkts))
1347 self.verify_capture_in(capture, self.pg0)
1349 pkts = self.create_stream_in(self.pg0, self.pg1)
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(
1355 capture, nat_ip=self.pg0.remote_ip4, same_port=True
1358 self.pg0.remote_hosts[0] = host0
1361 self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1362 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1363 self.vapi.nat44_ei_add_del_static_mapping(
1365 local_ip_address=real_ip,
1366 external_ip_address=alias_ip,
1367 external_sw_if_index=0xFFFFFFFF,
1371 def test_static_in(self):
1372 """NAT44EI 1:1 NAT initialized from inside network"""
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 flags = self.config_flags.NAT44_EI_IF_INSIDE
1381 self.vapi.nat44_ei_interface_add_del_feature(
1382 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1384 self.vapi.nat44_ei_interface_add_del_feature(
1385 sw_if_index=self.pg1.sw_if_index, is_add=1
1387 sm = self.vapi.nat44_ei_static_mapping_dump()
1388 self.assertEqual(len(sm), 1)
1389 self.assertEqual(sm[0].tag, "")
1390 self.assertEqual(sm[0].protocol, 0)
1391 self.assertEqual(sm[0].local_port, 0)
1392 self.assertEqual(sm[0].external_port, 0)
1395 pkts = self.create_stream_in(self.pg0, self.pg1)
1396 self.pg0.add_stream(pkts)
1397 self.pg_enable_capture(self.pg_interfaces)
1399 capture = self.pg1.get_capture(len(pkts))
1400 self.verify_capture_out(capture, nat_ip, True)
1403 pkts = self.create_stream_out(self.pg1, nat_ip)
1404 self.pg1.add_stream(pkts)
1405 self.pg_enable_capture(self.pg_interfaces)
1407 capture = self.pg0.get_capture(len(pkts))
1408 self.verify_capture_in(capture, self.pg0)
1410 def test_static_out(self):
1411 """NAT44EI 1:1 NAT initialized from outside network"""
1413 nat_ip = "10.0.0.20"
1414 self.tcp_port_out = 6303
1415 self.udp_port_out = 6304
1416 self.icmp_id_out = 6305
1419 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1420 flags = self.config_flags.NAT44_EI_IF_INSIDE
1421 self.vapi.nat44_ei_interface_add_del_feature(
1422 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1424 self.vapi.nat44_ei_interface_add_del_feature(
1425 sw_if_index=self.pg1.sw_if_index, is_add=1
1427 sm = self.vapi.nat44_ei_static_mapping_dump()
1428 self.assertEqual(len(sm), 1)
1429 self.assertEqual(sm[0].tag, tag)
1432 pkts = self.create_stream_out(self.pg1, nat_ip)
1433 self.pg1.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg0.get_capture(len(pkts))
1437 self.verify_capture_in(capture, self.pg0)
1440 pkts = self.create_stream_in(self.pg0, self.pg1)
1441 self.pg0.add_stream(pkts)
1442 self.pg_enable_capture(self.pg_interfaces)
1444 capture = self.pg1.get_capture(len(pkts))
1445 self.verify_capture_out(capture, nat_ip, True)
1447 def test_static_with_port_in(self):
1448 """NAT44EI 1:1 NAPT initialized from inside network"""
1450 self.tcp_port_out = 3606
1451 self.udp_port_out = 3607
1452 self.icmp_id_out = 3608
1454 self.nat44_add_address(self.nat_addr)
1455 self.nat44_add_static_mapping(
1456 self.pg0.remote_ip4,
1460 proto=IP_PROTOS.tcp,
1462 self.nat44_add_static_mapping(
1463 self.pg0.remote_ip4,
1467 proto=IP_PROTOS.udp,
1469 self.nat44_add_static_mapping(
1470 self.pg0.remote_ip4,
1474 proto=IP_PROTOS.icmp,
1476 flags = self.config_flags.NAT44_EI_IF_INSIDE
1477 self.vapi.nat44_ei_interface_add_del_feature(
1478 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1480 self.vapi.nat44_ei_interface_add_del_feature(
1481 sw_if_index=self.pg1.sw_if_index, is_add=1
1485 pkts = self.create_stream_in(self.pg0, self.pg1)
1486 self.pg0.add_stream(pkts)
1487 self.pg_enable_capture(self.pg_interfaces)
1489 capture = self.pg1.get_capture(len(pkts))
1490 self.verify_capture_out(capture)
1493 pkts = self.create_stream_out(self.pg1)
1494 self.pg1.add_stream(pkts)
1495 self.pg_enable_capture(self.pg_interfaces)
1497 capture = self.pg0.get_capture(len(pkts))
1498 self.verify_capture_in(capture, self.pg0)
1500 def test_static_with_port_out(self):
1501 """NAT44EI 1:1 NAPT initialized from outside network"""
1503 self.tcp_port_out = 30606
1504 self.udp_port_out = 30607
1505 self.icmp_id_out = 30608
1507 self.nat44_add_address(self.nat_addr)
1508 self.nat44_add_static_mapping(
1509 self.pg0.remote_ip4,
1513 proto=IP_PROTOS.tcp,
1515 self.nat44_add_static_mapping(
1516 self.pg0.remote_ip4,
1520 proto=IP_PROTOS.udp,
1522 self.nat44_add_static_mapping(
1523 self.pg0.remote_ip4,
1527 proto=IP_PROTOS.icmp,
1529 flags = self.config_flags.NAT44_EI_IF_INSIDE
1530 self.vapi.nat44_ei_interface_add_del_feature(
1531 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1533 self.vapi.nat44_ei_interface_add_del_feature(
1534 sw_if_index=self.pg1.sw_if_index, is_add=1
1538 pkts = self.create_stream_out(self.pg1)
1539 self.pg1.add_stream(pkts)
1540 self.pg_enable_capture(self.pg_interfaces)
1542 capture = self.pg0.get_capture(len(pkts))
1543 self.verify_capture_in(capture, self.pg0)
1546 pkts = self.create_stream_in(self.pg0, self.pg1)
1547 self.pg0.add_stream(pkts)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg1.get_capture(len(pkts))
1551 self.verify_capture_out(capture)
1553 def test_static_vrf_aware(self):
1554 """NAT44EI 1:1 NAT VRF awareness"""
1556 nat_ip1 = "10.0.0.30"
1557 nat_ip2 = "10.0.0.40"
1558 self.tcp_port_out = 6303
1559 self.udp_port_out = 6304
1560 self.icmp_id_out = 6305
1562 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1563 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1564 flags = self.config_flags.NAT44_EI_IF_INSIDE
1565 self.vapi.nat44_ei_interface_add_del_feature(
1566 sw_if_index=self.pg3.sw_if_index, is_add=1
1568 self.vapi.nat44_ei_interface_add_del_feature(
1569 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1571 self.vapi.nat44_ei_interface_add_del_feature(
1572 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1575 # inside interface VRF match NAT44EI static mapping VRF
1576 pkts = self.create_stream_in(self.pg4, self.pg3)
1577 self.pg4.add_stream(pkts)
1578 self.pg_enable_capture(self.pg_interfaces)
1580 capture = self.pg3.get_capture(len(pkts))
1581 self.verify_capture_out(capture, nat_ip1, True)
1583 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1585 pkts = self.create_stream_in(self.pg0, self.pg3)
1586 self.pg0.add_stream(pkts)
1587 self.pg_enable_capture(self.pg_interfaces)
1589 self.pg3.assert_nothing_captured()
1591 def test_dynamic_to_static(self):
1592 """NAT44EI Switch from dynamic translation to 1:1NAT"""
1593 nat_ip = "10.0.0.10"
1594 self.tcp_port_out = 6303
1595 self.udp_port_out = 6304
1596 self.icmp_id_out = 6305
1598 self.nat44_add_address(self.nat_addr)
1599 flags = self.config_flags.NAT44_EI_IF_INSIDE
1600 self.vapi.nat44_ei_interface_add_del_feature(
1601 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1603 self.vapi.nat44_ei_interface_add_del_feature(
1604 sw_if_index=self.pg1.sw_if_index, is_add=1
1608 pkts = self.create_stream_in(self.pg0, self.pg1)
1609 self.pg0.add_stream(pkts)
1610 self.pg_enable_capture(self.pg_interfaces)
1612 capture = self.pg1.get_capture(len(pkts))
1613 self.verify_capture_out(capture)
1616 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1617 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1618 self.assertEqual(len(sessions), 0)
1619 pkts = self.create_stream_in(self.pg0, self.pg1)
1620 self.pg0.add_stream(pkts)
1621 self.pg_enable_capture(self.pg_interfaces)
1623 capture = self.pg1.get_capture(len(pkts))
1624 self.verify_capture_out(capture, nat_ip, True)
1626 def test_identity_nat(self):
1627 """NAT44EI Identity NAT"""
1628 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1629 self.vapi.nat44_ei_add_del_identity_mapping(
1630 ip_address=self.pg0.remote_ip4,
1631 sw_if_index=0xFFFFFFFF,
1635 flags = self.config_flags.NAT44_EI_IF_INSIDE
1636 self.vapi.nat44_ei_interface_add_del_feature(
1637 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1639 self.vapi.nat44_ei_interface_add_del_feature(
1640 sw_if_index=self.pg1.sw_if_index, is_add=1
1644 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1645 / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1646 / TCP(sport=12345, dport=56789)
1648 self.pg1.add_stream(p)
1649 self.pg_enable_capture(self.pg_interfaces)
1651 capture = self.pg0.get_capture(1)
1656 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1657 self.assertEqual(ip.src, self.pg1.remote_ip4)
1658 self.assertEqual(tcp.dport, 56789)
1659 self.assertEqual(tcp.sport, 12345)
1660 self.assert_packet_checksums_valid(p)
1662 self.logger.error(ppp("Unexpected or invalid packet:", p))
1665 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1666 self.assertEqual(len(sessions), 0)
1667 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1668 self.vapi.nat44_ei_add_del_identity_mapping(
1669 ip_address=self.pg0.remote_ip4,
1670 sw_if_index=0xFFFFFFFF,
1675 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1676 self.assertEqual(len(identity_mappings), 2)
1678 def test_multiple_inside_interfaces(self):
1679 """NAT44EI multiple non-overlapping address space inside interfaces"""
1681 self.nat44_add_address(self.nat_addr)
1682 flags = self.config_flags.NAT44_EI_IF_INSIDE
1683 self.vapi.nat44_ei_interface_add_del_feature(
1684 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1686 self.vapi.nat44_ei_interface_add_del_feature(
1687 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1689 self.vapi.nat44_ei_interface_add_del_feature(
1690 sw_if_index=self.pg3.sw_if_index, is_add=1
1693 # between two NAT44EI inside interfaces (no translation)
1694 pkts = self.create_stream_in(self.pg0, self.pg1)
1695 self.pg0.add_stream(pkts)
1696 self.pg_enable_capture(self.pg_interfaces)
1698 capture = self.pg1.get_capture(len(pkts))
1699 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1701 # from inside to interface without translation
1702 pkts = self.create_stream_in(self.pg0, self.pg2)
1703 self.pg0.add_stream(pkts)
1704 self.pg_enable_capture(self.pg_interfaces)
1706 capture = self.pg2.get_capture(len(pkts))
1707 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1709 # in2out 1st interface
1710 pkts = self.create_stream_in(self.pg0, self.pg3)
1711 self.pg0.add_stream(pkts)
1712 self.pg_enable_capture(self.pg_interfaces)
1714 capture = self.pg3.get_capture(len(pkts))
1715 self.verify_capture_out(capture)
1717 # out2in 1st interface
1718 pkts = self.create_stream_out(self.pg3)
1719 self.pg3.add_stream(pkts)
1720 self.pg_enable_capture(self.pg_interfaces)
1722 capture = self.pg0.get_capture(len(pkts))
1723 self.verify_capture_in(capture, self.pg0)
1725 # in2out 2nd interface
1726 pkts = self.create_stream_in(self.pg1, self.pg3)
1727 self.pg1.add_stream(pkts)
1728 self.pg_enable_capture(self.pg_interfaces)
1730 capture = self.pg3.get_capture(len(pkts))
1731 self.verify_capture_out(capture)
1733 # out2in 2nd interface
1734 pkts = self.create_stream_out(self.pg3)
1735 self.pg3.add_stream(pkts)
1736 self.pg_enable_capture(self.pg_interfaces)
1738 capture = self.pg1.get_capture(len(pkts))
1739 self.verify_capture_in(capture, self.pg1)
1741 def test_inside_overlapping_interfaces(self):
1742 """NAT44EI multiple inside interfaces with overlapping address space"""
1744 static_nat_ip = "10.0.0.10"
1745 self.nat44_add_address(self.nat_addr)
1746 flags = self.config_flags.NAT44_EI_IF_INSIDE
1747 self.vapi.nat44_ei_interface_add_del_feature(
1748 sw_if_index=self.pg3.sw_if_index, is_add=1
1750 self.vapi.nat44_ei_interface_add_del_feature(
1751 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1753 self.vapi.nat44_ei_interface_add_del_feature(
1754 sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1756 self.vapi.nat44_ei_interface_add_del_feature(
1757 sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1759 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1761 # between NAT44EI inside interfaces with same VRF (no translation)
1762 pkts = self.create_stream_in(self.pg4, self.pg5)
1763 self.pg4.add_stream(pkts)
1764 self.pg_enable_capture(self.pg_interfaces)
1766 capture = self.pg5.get_capture(len(pkts))
1767 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1769 # between NAT44EI inside interfaces with different VRF (hairpinning)
1771 Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1772 / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1773 / TCP(sport=1234, dport=5678)
1775 self.pg4.add_stream(p)
1776 self.pg_enable_capture(self.pg_interfaces)
1778 capture = self.pg6.get_capture(1)
1783 self.assertEqual(ip.src, self.nat_addr)
1784 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1785 self.assertNotEqual(tcp.sport, 1234)
1786 self.assertEqual(tcp.dport, 5678)
1788 self.logger.error(ppp("Unexpected or invalid packet:", p))
1791 # in2out 1st interface
1792 pkts = self.create_stream_in(self.pg4, self.pg3)
1793 self.pg4.add_stream(pkts)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg3.get_capture(len(pkts))
1797 self.verify_capture_out(capture)
1799 # out2in 1st interface
1800 pkts = self.create_stream_out(self.pg3)
1801 self.pg3.add_stream(pkts)
1802 self.pg_enable_capture(self.pg_interfaces)
1804 capture = self.pg4.get_capture(len(pkts))
1805 self.verify_capture_in(capture, self.pg4)
1807 # in2out 2nd interface
1808 pkts = self.create_stream_in(self.pg5, self.pg3)
1809 self.pg5.add_stream(pkts)
1810 self.pg_enable_capture(self.pg_interfaces)
1812 capture = self.pg3.get_capture(len(pkts))
1813 self.verify_capture_out(capture)
1815 # out2in 2nd interface
1816 pkts = self.create_stream_out(self.pg3)
1817 self.pg3.add_stream(pkts)
1818 self.pg_enable_capture(self.pg_interfaces)
1820 capture = self.pg5.get_capture(len(pkts))
1821 self.verify_capture_in(capture, self.pg5)
1824 addresses = self.vapi.nat44_ei_address_dump()
1825 self.assertEqual(len(addresses), 1)
1826 sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1827 self.assertEqual(len(sessions), 3)
1828 for session in sessions:
1829 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1830 self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1831 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1832 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1833 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1834 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1835 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1836 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1837 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1838 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1839 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1840 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1842 # in2out 3rd interface
1843 pkts = self.create_stream_in(self.pg6, self.pg3)
1844 self.pg6.add_stream(pkts)
1845 self.pg_enable_capture(self.pg_interfaces)
1847 capture = self.pg3.get_capture(len(pkts))
1848 self.verify_capture_out(capture, static_nat_ip, True)
1850 # out2in 3rd interface
1851 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1852 self.pg3.add_stream(pkts)
1853 self.pg_enable_capture(self.pg_interfaces)
1855 capture = self.pg6.get_capture(len(pkts))
1856 self.verify_capture_in(capture, self.pg6)
1858 # general user and session dump verifications
1859 users = self.vapi.nat44_ei_user_dump()
1860 self.assertGreaterEqual(len(users), 3)
1861 addresses = self.vapi.nat44_ei_address_dump()
1862 self.assertEqual(len(addresses), 1)
1864 sessions = self.vapi.nat44_ei_user_session_dump(
1865 user.ip_address, user.vrf_id
1867 for session in sessions:
1868 self.assertEqual(user.ip_address, session.inside_ip_address)
1869 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1871 session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1875 sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1876 self.assertGreaterEqual(len(sessions), 4)
1877 for session in sessions:
1878 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1879 self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1880 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1883 sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1884 self.assertGreaterEqual(len(sessions), 3)
1885 for session in sessions:
1886 self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1887 self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1888 self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1891 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1894 def test_hairpinning(self):
1895 """NAT44EI hairpinning - 1:1 NAPT"""
1897 host = self.pg0.remote_hosts[0]
1898 server = self.pg0.remote_hosts[1]
1901 server_in_port = 5678
1902 server_out_port = 8765
1904 self.nat44_add_address(self.nat_addr)
1905 flags = self.config_flags.NAT44_EI_IF_INSIDE
1906 self.vapi.nat44_ei_interface_add_del_feature(
1907 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1909 self.vapi.nat44_ei_interface_add_del_feature(
1910 sw_if_index=self.pg1.sw_if_index, is_add=1
1913 # add static mapping for server
1914 self.nat44_add_static_mapping(
1919 proto=IP_PROTOS.tcp,
1922 cnt = self.statistics["/nat44-ei/hairpinning"]
1923 # send packet from host to server
1925 Ether(src=host.mac, dst=self.pg0.local_mac)
1926 / IP(src=host.ip4, dst=self.nat_addr)
1927 / TCP(sport=host_in_port, dport=server_out_port)
1929 self.pg0.add_stream(p)
1930 self.pg_enable_capture(self.pg_interfaces)
1932 capture = self.pg0.get_capture(1)
1937 self.assertEqual(ip.src, self.nat_addr)
1938 self.assertEqual(ip.dst, server.ip4)
1939 self.assertNotEqual(tcp.sport, host_in_port)
1940 self.assertEqual(tcp.dport, server_in_port)
1941 self.assert_packet_checksums_valid(p)
1942 host_out_port = tcp.sport
1944 self.logger.error(ppp("Unexpected or invalid packet:", p))
1947 after = self.statistics["/nat44-ei/hairpinning"]
1948 if_idx = self.pg0.sw_if_index
1949 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1951 # send reply from server to host
1953 Ether(src=server.mac, dst=self.pg0.local_mac)
1954 / IP(src=server.ip4, dst=self.nat_addr)
1955 / TCP(sport=server_in_port, dport=host_out_port)
1957 self.pg0.add_stream(p)
1958 self.pg_enable_capture(self.pg_interfaces)
1960 capture = self.pg0.get_capture(1)
1965 self.assertEqual(ip.src, self.nat_addr)
1966 self.assertEqual(ip.dst, host.ip4)
1967 self.assertEqual(tcp.sport, server_out_port)
1968 self.assertEqual(tcp.dport, host_in_port)
1969 self.assert_packet_checksums_valid(p)
1971 self.logger.error(ppp("Unexpected or invalid packet:", p))
1974 after = self.statistics["/nat44-ei/hairpinning"]
1975 if_idx = self.pg0.sw_if_index
1977 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1978 2 + (1 if self.vpp_worker_count > 0 else 0),
1981 def test_hairpinning2(self):
1982 """NAT44EI hairpinning - 1:1 NAT"""
1984 server1_nat_ip = "10.0.0.10"
1985 server2_nat_ip = "10.0.0.11"
1986 host = self.pg0.remote_hosts[0]
1987 server1 = self.pg0.remote_hosts[1]
1988 server2 = self.pg0.remote_hosts[2]
1989 server_tcp_port = 22
1990 server_udp_port = 20
1992 self.nat44_add_address(self.nat_addr)
1993 flags = self.config_flags.NAT44_EI_IF_INSIDE
1994 self.vapi.nat44_ei_interface_add_del_feature(
1995 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1997 self.vapi.nat44_ei_interface_add_del_feature(
1998 sw_if_index=self.pg1.sw_if_index, is_add=1
2001 # add static mapping for servers
2002 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2003 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2008 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2009 / IP(src=host.ip4, dst=server1_nat_ip)
2010 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2014 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2015 / IP(src=host.ip4, dst=server1_nat_ip)
2016 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2020 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2021 / IP(src=host.ip4, dst=server1_nat_ip)
2022 / ICMP(id=self.icmp_id_in, type="echo-request")
2025 self.pg0.add_stream(pkts)
2026 self.pg_enable_capture(self.pg_interfaces)
2028 capture = self.pg0.get_capture(len(pkts))
2029 for packet in capture:
2031 self.assertEqual(packet[IP].src, self.nat_addr)
2032 self.assertEqual(packet[IP].dst, server1.ip4)
2033 if packet.haslayer(TCP):
2034 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2035 self.assertEqual(packet[TCP].dport, server_tcp_port)
2036 self.tcp_port_out = packet[TCP].sport
2037 self.assert_packet_checksums_valid(packet)
2038 elif packet.haslayer(UDP):
2039 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2040 self.assertEqual(packet[UDP].dport, server_udp_port)
2041 self.udp_port_out = packet[UDP].sport
2043 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2044 self.icmp_id_out = packet[ICMP].id
2046 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2052 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2053 / IP(src=server1.ip4, dst=self.nat_addr)
2054 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2058 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2059 / IP(src=server1.ip4, dst=self.nat_addr)
2060 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2064 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2065 / IP(src=server1.ip4, dst=self.nat_addr)
2066 / ICMP(id=self.icmp_id_out, type="echo-reply")
2069 self.pg0.add_stream(pkts)
2070 self.pg_enable_capture(self.pg_interfaces)
2072 capture = self.pg0.get_capture(len(pkts))
2073 for packet in capture:
2075 self.assertEqual(packet[IP].src, server1_nat_ip)
2076 self.assertEqual(packet[IP].dst, host.ip4)
2077 if packet.haslayer(TCP):
2078 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2079 self.assertEqual(packet[TCP].sport, server_tcp_port)
2080 self.assert_packet_checksums_valid(packet)
2081 elif packet.haslayer(UDP):
2082 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2083 self.assertEqual(packet[UDP].sport, server_udp_port)
2085 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2087 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2090 # server2 to server1
2093 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2094 / IP(src=server2.ip4, dst=server1_nat_ip)
2095 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2099 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2100 / IP(src=server2.ip4, dst=server1_nat_ip)
2101 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2105 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2106 / IP(src=server2.ip4, dst=server1_nat_ip)
2107 / ICMP(id=self.icmp_id_in, type="echo-request")
2110 self.pg0.add_stream(pkts)
2111 self.pg_enable_capture(self.pg_interfaces)
2113 capture = self.pg0.get_capture(len(pkts))
2114 for packet in capture:
2116 self.assertEqual(packet[IP].src, server2_nat_ip)
2117 self.assertEqual(packet[IP].dst, server1.ip4)
2118 if packet.haslayer(TCP):
2119 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2120 self.assertEqual(packet[TCP].dport, server_tcp_port)
2121 self.tcp_port_out = packet[TCP].sport
2122 self.assert_packet_checksums_valid(packet)
2123 elif packet.haslayer(UDP):
2124 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2125 self.assertEqual(packet[UDP].dport, server_udp_port)
2126 self.udp_port_out = packet[UDP].sport
2128 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2129 self.icmp_id_out = packet[ICMP].id
2131 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2134 # server1 to server2
2137 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2138 / IP(src=server1.ip4, dst=server2_nat_ip)
2139 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2143 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2144 / IP(src=server1.ip4, dst=server2_nat_ip)
2145 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2149 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2150 / IP(src=server1.ip4, dst=server2_nat_ip)
2151 / ICMP(id=self.icmp_id_out, type="echo-reply")
2154 self.pg0.add_stream(pkts)
2155 self.pg_enable_capture(self.pg_interfaces)
2157 capture = self.pg0.get_capture(len(pkts))
2158 for packet in capture:
2160 self.assertEqual(packet[IP].src, server1_nat_ip)
2161 self.assertEqual(packet[IP].dst, server2.ip4)
2162 if packet.haslayer(TCP):
2163 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2164 self.assertEqual(packet[TCP].sport, server_tcp_port)
2165 self.assert_packet_checksums_valid(packet)
2166 elif packet.haslayer(UDP):
2167 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2168 self.assertEqual(packet[UDP].sport, server_udp_port)
2170 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2172 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2175 def test_hairpinning_avoid_inf_loop(self):
2176 """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2178 host = self.pg0.remote_hosts[0]
2179 server = self.pg0.remote_hosts[1]
2182 server_in_port = 5678
2183 server_out_port = 8765
2185 self.nat44_add_address(self.nat_addr)
2186 flags = self.config_flags.NAT44_EI_IF_INSIDE
2187 self.vapi.nat44_ei_interface_add_del_feature(
2188 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2190 self.vapi.nat44_ei_interface_add_del_feature(
2191 sw_if_index=self.pg1.sw_if_index, is_add=1
2194 # add static mapping for server
2195 self.nat44_add_static_mapping(
2200 proto=IP_PROTOS.tcp,
2203 # add another static mapping that maps pg0.local_ip4 address to itself
2204 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2206 # send packet from host to VPP (the packet should get dropped)
2208 Ether(src=host.mac, dst=self.pg0.local_mac)
2209 / IP(src=host.ip4, dst=self.pg0.local_ip4)
2210 / TCP(sport=host_in_port, dport=server_out_port)
2212 self.pg0.add_stream(p)
2213 self.pg_enable_capture(self.pg_interfaces)
2215 # Here VPP used to crash due to an infinite loop
2217 cnt = self.statistics["/nat44-ei/hairpinning"]
2218 # send packet from host to server
2220 Ether(src=host.mac, dst=self.pg0.local_mac)
2221 / IP(src=host.ip4, dst=self.nat_addr)
2222 / TCP(sport=host_in_port, dport=server_out_port)
2224 self.pg0.add_stream(p)
2225 self.pg_enable_capture(self.pg_interfaces)
2227 capture = self.pg0.get_capture(1)
2232 self.assertEqual(ip.src, self.nat_addr)
2233 self.assertEqual(ip.dst, server.ip4)
2234 self.assertNotEqual(tcp.sport, host_in_port)
2235 self.assertEqual(tcp.dport, server_in_port)
2236 self.assert_packet_checksums_valid(p)
2237 host_out_port = tcp.sport
2239 self.logger.error(ppp("Unexpected or invalid packet:", p))
2242 after = self.statistics["/nat44-ei/hairpinning"]
2243 if_idx = self.pg0.sw_if_index
2244 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2246 # send reply from server to host
2248 Ether(src=server.mac, dst=self.pg0.local_mac)
2249 / IP(src=server.ip4, dst=self.nat_addr)
2250 / TCP(sport=server_in_port, dport=host_out_port)
2252 self.pg0.add_stream(p)
2253 self.pg_enable_capture(self.pg_interfaces)
2255 capture = self.pg0.get_capture(1)
2260 self.assertEqual(ip.src, self.nat_addr)
2261 self.assertEqual(ip.dst, host.ip4)
2262 self.assertEqual(tcp.sport, server_out_port)
2263 self.assertEqual(tcp.dport, host_in_port)
2264 self.assert_packet_checksums_valid(p)
2266 self.logger.error(ppp("Unexpected or invalid packet:", p))
2269 after = self.statistics["/nat44-ei/hairpinning"]
2270 if_idx = self.pg0.sw_if_index
2272 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2273 2 + (1 if self.vpp_worker_count > 0 else 0),
2276 def test_interface_addr(self):
2277 """NAT44EI acquire addresses from interface"""
2278 self.vapi.nat44_ei_add_del_interface_addr(
2279 is_add=1, sw_if_index=self.pg7.sw_if_index
2282 # no address in NAT pool
2283 addresses = self.vapi.nat44_ei_address_dump()
2284 self.assertEqual(0, len(addresses))
2286 # configure interface address and check NAT address pool
2287 self.pg7.config_ip4()
2288 addresses = self.vapi.nat44_ei_address_dump()
2289 self.assertEqual(1, len(addresses))
2290 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2292 # remove interface address and check NAT address pool
2293 self.pg7.unconfig_ip4()
2294 addresses = self.vapi.nat44_ei_address_dump()
2295 self.assertEqual(0, len(addresses))
2297 def test_interface_addr_static_mapping(self):
2298 """NAT44EI Static mapping with addresses from interface"""
2301 self.vapi.nat44_ei_add_del_interface_addr(
2302 is_add=1, sw_if_index=self.pg7.sw_if_index
2304 self.nat44_add_static_mapping(
2305 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2308 # static mappings with external interface
2309 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2310 self.assertEqual(1, len(static_mappings))
2311 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2312 self.assertEqual(static_mappings[0].tag, tag)
2314 # configure interface address and check static mappings
2315 self.pg7.config_ip4()
2316 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2317 self.assertEqual(2, len(static_mappings))
2319 for sm in static_mappings:
2320 if sm.external_sw_if_index == 0xFFFFFFFF:
2321 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2322 self.assertEqual(sm.tag, tag)
2324 self.assertTrue(resolved)
2326 # remove interface address and check static mappings
2327 self.pg7.unconfig_ip4()
2328 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2329 self.assertEqual(1, len(static_mappings))
2330 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2331 self.assertEqual(static_mappings[0].tag, tag)
2333 # configure interface address again and check static mappings
2334 self.pg7.config_ip4()
2335 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2336 self.assertEqual(2, len(static_mappings))
2338 for sm in static_mappings:
2339 if sm.external_sw_if_index == 0xFFFFFFFF:
2340 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2341 self.assertEqual(sm.tag, tag)
2343 self.assertTrue(resolved)
2345 # remove static mapping
2346 self.nat44_add_static_mapping(
2347 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2349 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2350 self.assertEqual(0, len(static_mappings))
2352 def test_interface_addr_identity_nat(self):
2353 """NAT44EI Identity NAT with addresses from interface"""
2356 self.vapi.nat44_ei_add_del_interface_addr(
2357 is_add=1, sw_if_index=self.pg7.sw_if_index
2359 self.vapi.nat44_ei_add_del_identity_mapping(
2361 sw_if_index=self.pg7.sw_if_index,
2363 protocol=IP_PROTOS.tcp,
2367 # identity mappings with external interface
2368 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2369 self.assertEqual(1, len(identity_mappings))
2370 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2372 # configure interface address and check identity mappings
2373 self.pg7.config_ip4()
2374 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2376 self.assertEqual(2, len(identity_mappings))
2377 for sm in identity_mappings:
2378 if sm.sw_if_index == 0xFFFFFFFF:
2380 str(identity_mappings[0].ip_address), self.pg7.local_ip4
2382 self.assertEqual(port, identity_mappings[0].port)
2383 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2385 self.assertTrue(resolved)
2387 # remove interface address and check identity mappings
2388 self.pg7.unconfig_ip4()
2389 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2390 self.assertEqual(1, len(identity_mappings))
2391 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2393 def test_ipfix_nat44_sess(self):
2394 """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2395 self.ipfix_domain_id = 10
2396 self.ipfix_src_port = 20202
2397 collector_port = 30303
2398 bind_layers(UDP, IPFIX, dport=30303)
2399 self.nat44_add_address(self.nat_addr)
2400 flags = self.config_flags.NAT44_EI_IF_INSIDE
2401 self.vapi.nat44_ei_interface_add_del_feature(
2402 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2404 self.vapi.nat44_ei_interface_add_del_feature(
2405 sw_if_index=self.pg1.sw_if_index, is_add=1
2407 self.vapi.set_ipfix_exporter(
2408 collector_address=self.pg3.remote_ip4,
2409 src_address=self.pg3.local_ip4,
2411 template_interval=10,
2412 collector_port=collector_port,
2414 self.vapi.nat44_ei_ipfix_enable_disable(
2415 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2418 pkts = self.create_stream_in(self.pg0, self.pg1)
2419 self.pg0.add_stream(pkts)
2420 self.pg_enable_capture(self.pg_interfaces)
2422 capture = self.pg1.get_capture(len(pkts))
2423 self.verify_capture_out(capture)
2424 self.nat44_add_address(self.nat_addr, is_add=0)
2425 self.vapi.ipfix_flush()
2426 capture = self.pg3.get_capture(7)
2427 ipfix = IPFIXDecoder()
2428 # first load template
2430 self.assertTrue(p.haslayer(IPFIX))
2431 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2432 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2433 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2434 self.assertEqual(p[UDP].dport, collector_port)
2435 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2436 if p.haslayer(Template):
2437 ipfix.add_template(p.getlayer(Template))
2438 # verify events in data set
2440 if p.haslayer(Data):
2441 data = ipfix.decode_data_set(p.getlayer(Set))
2442 self.verify_ipfix_nat44_ses(data)
2444 def test_ipfix_addr_exhausted(self):
2445 """NAT44EI IPFIX logging NAT addresses exhausted"""
2446 flags = self.config_flags.NAT44_EI_IF_INSIDE
2447 self.vapi.nat44_ei_interface_add_del_feature(
2448 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2450 self.vapi.nat44_ei_interface_add_del_feature(
2451 sw_if_index=self.pg1.sw_if_index, is_add=1
2453 self.vapi.set_ipfix_exporter(
2454 collector_address=self.pg3.remote_ip4,
2455 src_address=self.pg3.local_ip4,
2457 template_interval=10,
2459 self.vapi.nat44_ei_ipfix_enable_disable(
2460 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2464 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2465 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2468 self.pg0.add_stream(p)
2469 self.pg_enable_capture(self.pg_interfaces)
2471 self.pg1.assert_nothing_captured()
2472 self.vapi.ipfix_flush()
2473 capture = self.pg3.get_capture(7)
2474 ipfix = IPFIXDecoder()
2475 # first load template
2477 self.assertTrue(p.haslayer(IPFIX))
2478 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2479 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2480 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2481 self.assertEqual(p[UDP].dport, 4739)
2482 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2483 if p.haslayer(Template):
2484 ipfix.add_template(p.getlayer(Template))
2485 # verify events in data set
2487 if p.haslayer(Data):
2488 data = ipfix.decode_data_set(p.getlayer(Set))
2489 self.verify_ipfix_addr_exhausted(data)
2491 def test_ipfix_max_sessions(self):
2492 """NAT44EI IPFIX logging maximum session entries exceeded"""
2493 self.nat44_add_address(self.nat_addr)
2494 flags = self.config_flags.NAT44_EI_IF_INSIDE
2495 self.vapi.nat44_ei_interface_add_del_feature(
2496 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2498 self.vapi.nat44_ei_interface_add_del_feature(
2499 sw_if_index=self.pg1.sw_if_index, is_add=1
2502 max_sessions_per_thread = self.max_translations
2503 max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2506 for i in range(0, max_sessions):
2507 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2509 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2510 / IP(src=src, dst=self.pg1.remote_ip4)
2514 self.pg0.add_stream(pkts)
2515 self.pg_enable_capture(self.pg_interfaces)
2518 self.pg1.get_capture(max_sessions)
2519 self.vapi.set_ipfix_exporter(
2520 collector_address=self.pg3.remote_ip4,
2521 src_address=self.pg3.local_ip4,
2523 template_interval=10,
2525 self.vapi.nat44_ei_ipfix_enable_disable(
2526 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2530 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2531 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2534 self.pg0.add_stream(p)
2535 self.pg_enable_capture(self.pg_interfaces)
2537 self.pg1.assert_nothing_captured()
2538 self.vapi.ipfix_flush()
2539 capture = self.pg3.get_capture(7)
2540 ipfix = IPFIXDecoder()
2541 # first load template
2543 self.assertTrue(p.haslayer(IPFIX))
2544 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2545 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2546 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2547 self.assertEqual(p[UDP].dport, 4739)
2548 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2549 if p.haslayer(Template):
2550 ipfix.add_template(p.getlayer(Template))
2551 # verify events in data set
2553 if p.haslayer(Data):
2554 data = ipfix.decode_data_set(p.getlayer(Set))
2555 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2557 def test_syslog_apmap(self):
2558 """NAT44EI syslog address and port mapping creation and deletion"""
2559 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2560 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2561 self.nat44_add_address(self.nat_addr)
2562 flags = self.config_flags.NAT44_EI_IF_INSIDE
2563 self.vapi.nat44_ei_interface_add_del_feature(
2564 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2566 self.vapi.nat44_ei_interface_add_del_feature(
2567 sw_if_index=self.pg1.sw_if_index, is_add=1
2571 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2572 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2573 / TCP(sport=self.tcp_port_in, dport=20)
2575 self.pg0.add_stream(p)
2576 self.pg_enable_capture(self.pg_interfaces)
2578 capture = self.pg1.get_capture(1)
2579 self.tcp_port_out = capture[0][TCP].sport
2580 capture = self.pg3.get_capture(1)
2581 self.verify_syslog_apmap(capture[0][Raw].load)
2583 self.pg_enable_capture(self.pg_interfaces)
2585 self.nat44_add_address(self.nat_addr, is_add=0)
2586 capture = self.pg3.get_capture(1)
2587 self.verify_syslog_apmap(capture[0][Raw].load, False)
2589 def test_pool_addr_fib(self):
2590 """NAT44EI add pool addresses to FIB"""
2591 static_addr = "10.0.0.10"
2592 self.nat44_add_address(self.nat_addr)
2593 flags = self.config_flags.NAT44_EI_IF_INSIDE
2594 self.vapi.nat44_ei_interface_add_del_feature(
2595 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2597 self.vapi.nat44_ei_interface_add_del_feature(
2598 sw_if_index=self.pg1.sw_if_index, is_add=1
2600 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2603 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2606 psrc=self.pg1.remote_ip4,
2607 hwsrc=self.pg1.remote_mac,
2609 self.pg1.add_stream(p)
2610 self.pg_enable_capture(self.pg_interfaces)
2612 capture = self.pg1.get_capture(1)
2613 self.assertTrue(capture[0].haslayer(ARP))
2614 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2617 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2620 psrc=self.pg1.remote_ip4,
2621 hwsrc=self.pg1.remote_mac,
2623 self.pg1.add_stream(p)
2624 self.pg_enable_capture(self.pg_interfaces)
2626 capture = self.pg1.get_capture(1)
2627 self.assertTrue(capture[0].haslayer(ARP))
2628 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2630 # send ARP to non-NAT44EI interface
2631 p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2634 psrc=self.pg2.remote_ip4,
2635 hwsrc=self.pg2.remote_mac,
2637 self.pg2.add_stream(p)
2638 self.pg_enable_capture(self.pg_interfaces)
2640 self.pg1.assert_nothing_captured()
2642 # remove addresses and verify
2643 self.nat44_add_address(self.nat_addr, is_add=0)
2644 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2646 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2649 psrc=self.pg1.remote_ip4,
2650 hwsrc=self.pg1.remote_mac,
2652 self.pg1.add_stream(p)
2653 self.pg_enable_capture(self.pg_interfaces)
2655 self.pg1.assert_nothing_captured()
2657 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2660 psrc=self.pg1.remote_ip4,
2661 hwsrc=self.pg1.remote_mac,
2663 self.pg1.add_stream(p)
2664 self.pg_enable_capture(self.pg_interfaces)
2666 self.pg1.assert_nothing_captured()
2668 def test_vrf_mode(self):
2669 """NAT44EI tenant VRF aware address pool mode"""
2673 nat_ip1 = "10.0.0.10"
2674 nat_ip2 = "10.0.0.11"
2676 self.pg0.unconfig_ip4()
2677 self.pg1.unconfig_ip4()
2678 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2679 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2680 self.pg0.set_table_ip4(vrf_id1)
2681 self.pg1.set_table_ip4(vrf_id2)
2682 self.pg0.config_ip4()
2683 self.pg1.config_ip4()
2684 self.pg0.resolve_arp()
2685 self.pg1.resolve_arp()
2687 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2688 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2689 flags = self.config_flags.NAT44_EI_IF_INSIDE
2690 self.vapi.nat44_ei_interface_add_del_feature(
2691 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2693 self.vapi.nat44_ei_interface_add_del_feature(
2694 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2696 self.vapi.nat44_ei_interface_add_del_feature(
2697 sw_if_index=self.pg2.sw_if_index, is_add=1
2702 pkts = self.create_stream_in(self.pg0, self.pg2)
2703 self.pg0.add_stream(pkts)
2704 self.pg_enable_capture(self.pg_interfaces)
2706 capture = self.pg2.get_capture(len(pkts))
2707 self.verify_capture_out(capture, nat_ip1)
2710 pkts = self.create_stream_in(self.pg1, self.pg2)
2711 self.pg1.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg2.get_capture(len(pkts))
2715 self.verify_capture_out(capture, nat_ip2)
2718 self.pg0.unconfig_ip4()
2719 self.pg1.unconfig_ip4()
2720 self.pg0.set_table_ip4(0)
2721 self.pg1.set_table_ip4(0)
2722 self.pg0.config_ip4()
2723 self.pg1.config_ip4()
2724 self.pg0.resolve_arp()
2725 self.pg1.resolve_arp()
2726 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2727 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2729 def test_vrf_feature_independent(self):
2730 """NAT44EI tenant VRF independent address pool mode"""
2732 nat_ip1 = "10.0.0.10"
2733 nat_ip2 = "10.0.0.11"
2735 self.nat44_add_address(nat_ip1)
2736 self.nat44_add_address(nat_ip2, vrf_id=99)
2737 flags = self.config_flags.NAT44_EI_IF_INSIDE
2738 self.vapi.nat44_ei_interface_add_del_feature(
2739 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2741 self.vapi.nat44_ei_interface_add_del_feature(
2742 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2744 self.vapi.nat44_ei_interface_add_del_feature(
2745 sw_if_index=self.pg2.sw_if_index, is_add=1
2749 pkts = self.create_stream_in(self.pg0, self.pg2)
2750 self.pg0.add_stream(pkts)
2751 self.pg_enable_capture(self.pg_interfaces)
2753 capture = self.pg2.get_capture(len(pkts))
2754 self.verify_capture_out(capture, nat_ip1)
2757 pkts = self.create_stream_in(self.pg1, self.pg2)
2758 self.pg1.add_stream(pkts)
2759 self.pg_enable_capture(self.pg_interfaces)
2761 capture = self.pg2.get_capture(len(pkts))
2762 self.verify_capture_out(capture, nat_ip1)
2764 def test_dynamic_ipless_interfaces(self):
2765 """NAT44EI interfaces without configured IP address"""
2766 self.create_routes_and_neigbors()
2767 self.nat44_add_address(self.nat_addr)
2768 flags = self.config_flags.NAT44_EI_IF_INSIDE
2769 self.vapi.nat44_ei_interface_add_del_feature(
2770 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2772 self.vapi.nat44_ei_interface_add_del_feature(
2773 sw_if_index=self.pg8.sw_if_index, is_add=1
2777 pkts = self.create_stream_in(self.pg7, self.pg8)
2778 self.pg7.add_stream(pkts)
2779 self.pg_enable_capture(self.pg_interfaces)
2781 capture = self.pg8.get_capture(len(pkts))
2782 self.verify_capture_out(capture)
2785 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2786 self.pg8.add_stream(pkts)
2787 self.pg_enable_capture(self.pg_interfaces)
2789 capture = self.pg7.get_capture(len(pkts))
2790 self.verify_capture_in(capture, self.pg7)
2792 def test_static_ipless_interfaces(self):
2793 """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2795 self.create_routes_and_neigbors()
2796 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2797 flags = self.config_flags.NAT44_EI_IF_INSIDE
2798 self.vapi.nat44_ei_interface_add_del_feature(
2799 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2801 self.vapi.nat44_ei_interface_add_del_feature(
2802 sw_if_index=self.pg8.sw_if_index, is_add=1
2806 pkts = self.create_stream_out(self.pg8)
2807 self.pg8.add_stream(pkts)
2808 self.pg_enable_capture(self.pg_interfaces)
2810 capture = self.pg7.get_capture(len(pkts))
2811 self.verify_capture_in(capture, self.pg7)
2814 pkts = self.create_stream_in(self.pg7, self.pg8)
2815 self.pg7.add_stream(pkts)
2816 self.pg_enable_capture(self.pg_interfaces)
2818 capture = self.pg8.get_capture(len(pkts))
2819 self.verify_capture_out(capture, self.nat_addr, True)
2821 def test_static_with_port_ipless_interfaces(self):
2822 """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2824 self.tcp_port_out = 30606
2825 self.udp_port_out = 30607
2826 self.icmp_id_out = 30608
2828 self.create_routes_and_neigbors()
2829 self.nat44_add_address(self.nat_addr)
2830 self.nat44_add_static_mapping(
2831 self.pg7.remote_ip4,
2835 proto=IP_PROTOS.tcp,
2837 self.nat44_add_static_mapping(
2838 self.pg7.remote_ip4,
2842 proto=IP_PROTOS.udp,
2844 self.nat44_add_static_mapping(
2845 self.pg7.remote_ip4,
2849 proto=IP_PROTOS.icmp,
2851 flags = self.config_flags.NAT44_EI_IF_INSIDE
2852 self.vapi.nat44_ei_interface_add_del_feature(
2853 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2855 self.vapi.nat44_ei_interface_add_del_feature(
2856 sw_if_index=self.pg8.sw_if_index, is_add=1
2860 pkts = self.create_stream_out(self.pg8)
2861 self.pg8.add_stream(pkts)
2862 self.pg_enable_capture(self.pg_interfaces)
2864 capture = self.pg7.get_capture(len(pkts))
2865 self.verify_capture_in(capture, self.pg7)
2868 pkts = self.create_stream_in(self.pg7, self.pg8)
2869 self.pg7.add_stream(pkts)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 capture = self.pg8.get_capture(len(pkts))
2873 self.verify_capture_out(capture)
2875 def test_static_unknown_proto(self):
2876 """NAT44EI 1:1 translate packet with unknown protocol"""
2877 nat_ip = "10.0.0.10"
2878 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2879 flags = self.config_flags.NAT44_EI_IF_INSIDE
2880 self.vapi.nat44_ei_interface_add_del_feature(
2881 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2883 self.vapi.nat44_ei_interface_add_del_feature(
2884 sw_if_index=self.pg1.sw_if_index, is_add=1
2889 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2890 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2892 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2893 / TCP(sport=1234, dport=1234)
2895 self.pg0.add_stream(p)
2896 self.pg_enable_capture(self.pg_interfaces)
2898 p = self.pg1.get_capture(1)
2901 self.assertEqual(packet[IP].src, nat_ip)
2902 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2903 self.assertEqual(packet.haslayer(GRE), 1)
2904 self.assert_packet_checksums_valid(packet)
2906 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2911 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2912 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2914 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2915 / TCP(sport=1234, dport=1234)
2917 self.pg1.add_stream(p)
2918 self.pg_enable_capture(self.pg_interfaces)
2920 p = self.pg0.get_capture(1)
2923 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2924 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2925 self.assertEqual(packet.haslayer(GRE), 1)
2926 self.assert_packet_checksums_valid(packet)
2928 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2931 def test_hairpinning_static_unknown_proto(self):
2932 """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2934 host = self.pg0.remote_hosts[0]
2935 server = self.pg0.remote_hosts[1]
2937 host_nat_ip = "10.0.0.10"
2938 server_nat_ip = "10.0.0.11"
2940 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2941 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2942 flags = self.config_flags.NAT44_EI_IF_INSIDE
2943 self.vapi.nat44_ei_interface_add_del_feature(
2944 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2946 self.vapi.nat44_ei_interface_add_del_feature(
2947 sw_if_index=self.pg1.sw_if_index, is_add=1
2952 Ether(dst=self.pg0.local_mac, src=host.mac)
2953 / IP(src=host.ip4, dst=server_nat_ip)
2955 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2956 / TCP(sport=1234, dport=1234)
2958 self.pg0.add_stream(p)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 p = self.pg0.get_capture(1)
2964 self.assertEqual(packet[IP].src, host_nat_ip)
2965 self.assertEqual(packet[IP].dst, server.ip4)
2966 self.assertEqual(packet.haslayer(GRE), 1)
2967 self.assert_packet_checksums_valid(packet)
2969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2974 Ether(dst=self.pg0.local_mac, src=server.mac)
2975 / IP(src=server.ip4, dst=host_nat_ip)
2977 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2978 / TCP(sport=1234, dport=1234)
2980 self.pg0.add_stream(p)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 p = self.pg0.get_capture(1)
2986 self.assertEqual(packet[IP].src, server_nat_ip)
2987 self.assertEqual(packet[IP].dst, host.ip4)
2988 self.assertEqual(packet.haslayer(GRE), 1)
2989 self.assert_packet_checksums_valid(packet)
2991 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2994 def test_output_feature(self):
2995 """NAT44EI output feature (in2out postrouting)"""
2996 self.nat44_add_address(self.nat_addr)
2997 self.vapi.nat44_ei_add_del_output_interface(
2998 sw_if_index=self.pg3.sw_if_index, is_add=1
3002 pkts = self.create_stream_in(self.pg0, self.pg3)
3003 self.pg0.add_stream(pkts)
3004 self.pg_enable_capture(self.pg_interfaces)
3006 capture = self.pg3.get_capture(len(pkts))
3007 self.verify_capture_out(capture)
3010 pkts = self.create_stream_out(self.pg3)
3011 self.pg3.add_stream(pkts)
3012 self.pg_enable_capture(self.pg_interfaces)
3014 capture = self.pg0.get_capture(len(pkts))
3015 self.verify_capture_in(capture, self.pg0)
3017 # from non-NAT interface to NAT inside interface
3018 pkts = self.create_stream_in(self.pg2, self.pg0)
3019 self.pg2.add_stream(pkts)
3020 self.pg_enable_capture(self.pg_interfaces)
3022 capture = self.pg0.get_capture(len(pkts))
3023 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3025 def test_output_feature_vrf_aware(self):
3026 """NAT44EI output feature VRF aware (in2out postrouting)"""
3027 nat_ip_vrf10 = "10.0.0.10"
3028 nat_ip_vrf20 = "10.0.0.20"
3032 self.pg3.remote_ip4,
3034 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3039 self.pg3.remote_ip4,
3041 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3047 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3048 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3049 self.vapi.nat44_ei_add_del_output_interface(
3050 sw_if_index=self.pg3.sw_if_index, is_add=1
3054 pkts = self.create_stream_in(self.pg4, self.pg3)
3055 self.pg4.add_stream(pkts)
3056 self.pg_enable_capture(self.pg_interfaces)
3058 capture = self.pg3.get_capture(len(pkts))
3059 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3062 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3063 self.pg3.add_stream(pkts)
3064 self.pg_enable_capture(self.pg_interfaces)
3066 capture = self.pg4.get_capture(len(pkts))
3067 self.verify_capture_in(capture, self.pg4)
3070 pkts = self.create_stream_in(self.pg6, self.pg3)
3071 self.pg6.add_stream(pkts)
3072 self.pg_enable_capture(self.pg_interfaces)
3074 capture = self.pg3.get_capture(len(pkts))
3075 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3078 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3079 self.pg3.add_stream(pkts)
3080 self.pg_enable_capture(self.pg_interfaces)
3082 capture = self.pg6.get_capture(len(pkts))
3083 self.verify_capture_in(capture, self.pg6)
3085 def test_output_feature_hairpinning(self):
3086 """NAT44EI output feature hairpinning (in2out postrouting)"""
3087 host = self.pg0.remote_hosts[0]
3088 server = self.pg0.remote_hosts[1]
3091 server_in_port = 5678
3092 server_out_port = 8765
3094 self.nat44_add_address(self.nat_addr)
3095 self.vapi.nat44_ei_add_del_output_interface(
3096 sw_if_index=self.pg0.sw_if_index, is_add=1
3098 self.vapi.nat44_ei_add_del_output_interface(
3099 sw_if_index=self.pg1.sw_if_index, is_add=1
3102 # add static mapping for server
3103 self.nat44_add_static_mapping(
3108 proto=IP_PROTOS.tcp,
3111 # send packet from host to server
3113 Ether(src=host.mac, dst=self.pg0.local_mac)
3114 / IP(src=host.ip4, dst=self.nat_addr)
3115 / TCP(sport=host_in_port, dport=server_out_port)
3117 self.pg0.add_stream(p)
3118 self.pg_enable_capture(self.pg_interfaces)
3120 capture = self.pg0.get_capture(1)
3125 self.assertEqual(ip.src, self.nat_addr)
3126 self.assertEqual(ip.dst, server.ip4)
3127 self.assertNotEqual(tcp.sport, host_in_port)
3128 self.assertEqual(tcp.dport, server_in_port)
3129 self.assert_packet_checksums_valid(p)
3130 host_out_port = tcp.sport
3132 self.logger.error(ppp("Unexpected or invalid packet:", p))
3135 # send reply from server to host
3137 Ether(src=server.mac, dst=self.pg0.local_mac)
3138 / IP(src=server.ip4, dst=self.nat_addr)
3139 / TCP(sport=server_in_port, dport=host_out_port)
3141 self.pg0.add_stream(p)
3142 self.pg_enable_capture(self.pg_interfaces)
3144 capture = self.pg0.get_capture(1)
3149 self.assertEqual(ip.src, self.nat_addr)
3150 self.assertEqual(ip.dst, host.ip4)
3151 self.assertEqual(tcp.sport, server_out_port)
3152 self.assertEqual(tcp.dport, host_in_port)
3153 self.assert_packet_checksums_valid(p)
3155 self.logger.error(ppp("Unexpected or invalid packet:", p))
3158 def test_one_armed_nat44(self):
3159 """NAT44EI One armed NAT"""
3160 remote_host = self.pg9.remote_hosts[0]
3161 local_host = self.pg9.remote_hosts[1]
3164 self.nat44_add_address(self.nat_addr)
3165 flags = self.config_flags.NAT44_EI_IF_INSIDE
3166 self.vapi.nat44_ei_interface_add_del_feature(
3167 sw_if_index=self.pg9.sw_if_index, is_add=1
3169 self.vapi.nat44_ei_interface_add_del_feature(
3170 sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3175 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3176 / IP(src=local_host.ip4, dst=remote_host.ip4)
3177 / TCP(sport=12345, dport=80)
3179 self.pg9.add_stream(p)
3180 self.pg_enable_capture(self.pg_interfaces)
3182 capture = self.pg9.get_capture(1)
3187 self.assertEqual(ip.src, self.nat_addr)
3188 self.assertEqual(ip.dst, remote_host.ip4)
3189 self.assertNotEqual(tcp.sport, 12345)
3190 external_port = tcp.sport
3191 self.assertEqual(tcp.dport, 80)
3192 self.assert_packet_checksums_valid(p)
3194 self.logger.error(ppp("Unexpected or invalid packet:", p))
3199 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3200 / IP(src=remote_host.ip4, dst=self.nat_addr)
3201 / TCP(sport=80, dport=external_port)
3203 self.pg9.add_stream(p)
3204 self.pg_enable_capture(self.pg_interfaces)
3206 capture = self.pg9.get_capture(1)
3211 self.assertEqual(ip.src, remote_host.ip4)
3212 self.assertEqual(ip.dst, local_host.ip4)
3213 self.assertEqual(tcp.sport, 80)
3214 self.assertEqual(tcp.dport, 12345)
3215 self.assert_packet_checksums_valid(p)
3217 self.logger.error(ppp("Unexpected or invalid packet:", p))
3220 if self.vpp_worker_count > 1:
3221 node = "nat44-ei-handoff-classify"
3223 node = "nat44-ei-classify"
3225 err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3226 self.assertEqual(err, 1)
3227 err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3228 self.assertEqual(err, 1)
3230 def test_del_session(self):
3231 """NAT44EI delete session"""
3232 self.nat44_add_address(self.nat_addr)
3233 flags = self.config_flags.NAT44_EI_IF_INSIDE
3234 self.vapi.nat44_ei_interface_add_del_feature(
3235 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3237 self.vapi.nat44_ei_interface_add_del_feature(
3238 sw_if_index=self.pg1.sw_if_index, is_add=1
3241 pkts = self.create_stream_in(self.pg0, self.pg1)
3242 self.pg0.add_stream(pkts)
3243 self.pg_enable_capture(self.pg_interfaces)
3245 self.pg1.get_capture(len(pkts))
3247 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3248 nsessions = len(sessions)
3250 self.vapi.nat44_ei_del_session(
3251 address=sessions[0].inside_ip_address,
3252 port=sessions[0].inside_port,
3253 protocol=sessions[0].protocol,
3254 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3257 self.vapi.nat44_ei_del_session(
3258 address=sessions[1].outside_ip_address,
3259 port=sessions[1].outside_port,
3260 protocol=sessions[1].protocol,
3263 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3264 self.assertEqual(nsessions - len(sessions), 2)
3266 self.vapi.nat44_ei_del_session(
3267 address=sessions[0].inside_ip_address,
3268 port=sessions[0].inside_port,
3269 protocol=sessions[0].protocol,
3270 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3273 self.verify_no_nat44_user()
3275 def test_frag_in_order(self):
3276 """NAT44EI translate fragments arriving in order"""
3278 self.nat44_add_address(self.nat_addr)
3279 flags = self.config_flags.NAT44_EI_IF_INSIDE
3280 self.vapi.nat44_ei_interface_add_del_feature(
3281 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3283 self.vapi.nat44_ei_interface_add_del_feature(
3284 sw_if_index=self.pg1.sw_if_index, is_add=1
3287 self.frag_in_order(proto=IP_PROTOS.tcp)
3288 self.frag_in_order(proto=IP_PROTOS.udp)
3289 self.frag_in_order(proto=IP_PROTOS.icmp)
3291 def test_frag_forwarding(self):
3292 """NAT44EI forwarding fragment test"""
3293 self.vapi.nat44_ei_add_del_interface_addr(
3294 is_add=1, sw_if_index=self.pg1.sw_if_index
3296 flags = self.config_flags.NAT44_EI_IF_INSIDE
3297 self.vapi.nat44_ei_interface_add_del_feature(
3298 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3300 self.vapi.nat44_ei_interface_add_del_feature(
3301 sw_if_index=self.pg1.sw_if_index, is_add=1
3303 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3305 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3306 pkts = self.create_stream_frag(
3307 self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3309 self.pg1.add_stream(pkts)
3310 self.pg_enable_capture(self.pg_interfaces)
3312 frags = self.pg0.get_capture(len(pkts))
3313 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3314 self.assertEqual(p[UDP].sport, 4789)
3315 self.assertEqual(p[UDP].dport, 4789)
3316 self.assertEqual(data, p[Raw].load)
3318 def test_reass_hairpinning(self):
3319 """NAT44EI fragments hairpinning"""
3321 server_addr = self.pg0.remote_hosts[1].ip4
3322 host_in_port = random.randint(1025, 65535)
3323 server_in_port = random.randint(1025, 65535)
3324 server_out_port = random.randint(1025, 65535)
3326 self.nat44_add_address(self.nat_addr)
3327 flags = self.config_flags.NAT44_EI_IF_INSIDE
3328 self.vapi.nat44_ei_interface_add_del_feature(
3329 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3331 self.vapi.nat44_ei_interface_add_del_feature(
3332 sw_if_index=self.pg1.sw_if_index, is_add=1
3334 # add static mapping for server
3335 self.nat44_add_static_mapping(
3340 proto=IP_PROTOS.tcp,
3342 self.nat44_add_static_mapping(
3347 proto=IP_PROTOS.udp,
3349 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3351 self.reass_hairpinning(
3356 proto=IP_PROTOS.tcp,
3358 self.reass_hairpinning(
3363 proto=IP_PROTOS.udp,
3365 self.reass_hairpinning(
3370 proto=IP_PROTOS.icmp,
3373 def test_frag_out_of_order(self):
3374 """NAT44EI translate fragments arriving out of order"""
3376 self.nat44_add_address(self.nat_addr)
3377 flags = self.config_flags.NAT44_EI_IF_INSIDE
3378 self.vapi.nat44_ei_interface_add_del_feature(
3379 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3381 self.vapi.nat44_ei_interface_add_del_feature(
3382 sw_if_index=self.pg1.sw_if_index, is_add=1
3385 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3386 self.frag_out_of_order(proto=IP_PROTOS.udp)
3387 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3389 def test_port_restricted(self):
3390 """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3391 self.nat44_add_address(self.nat_addr)
3392 flags = self.config_flags.NAT44_EI_IF_INSIDE
3393 self.vapi.nat44_ei_interface_add_del_feature(
3394 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3396 self.vapi.nat44_ei_interface_add_del_feature(
3397 sw_if_index=self.pg1.sw_if_index, is_add=1
3399 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3400 alg=1, psid_offset=6, psid_length=6, psid=10
3404 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3405 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3406 / TCP(sport=4567, dport=22)
3408 self.pg0.add_stream(p)
3409 self.pg_enable_capture(self.pg_interfaces)
3411 capture = self.pg1.get_capture(1)
3416 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3417 self.assertEqual(ip.src, self.nat_addr)
3418 self.assertEqual(tcp.dport, 22)
3419 self.assertNotEqual(tcp.sport, 4567)
3420 self.assertEqual((tcp.sport >> 6) & 63, 10)
3421 self.assert_packet_checksums_valid(p)
3423 self.logger.error(ppp("Unexpected or invalid packet:", p))
3426 def test_port_range(self):
3427 """NAT44EI External address port range"""
3428 self.nat44_add_address(self.nat_addr)
3429 flags = self.config_flags.NAT44_EI_IF_INSIDE
3430 self.vapi.nat44_ei_interface_add_del_feature(
3431 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3433 self.vapi.nat44_ei_interface_add_del_feature(
3434 sw_if_index=self.pg1.sw_if_index, is_add=1
3436 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3437 alg=2, start_port=1025, end_port=1027
3441 for port in range(0, 5):
3443 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3444 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3445 / TCP(sport=1125 + port)
3448 self.pg0.add_stream(pkts)
3449 self.pg_enable_capture(self.pg_interfaces)
3451 capture = self.pg1.get_capture(3)
3454 self.assertGreaterEqual(tcp.sport, 1025)
3455 self.assertLessEqual(tcp.sport, 1027)
3457 def test_multiple_outside_vrf(self):
3458 """NAT44EI Multiple outside VRF"""
3462 self.pg1.unconfig_ip4()
3463 self.pg2.unconfig_ip4()
3464 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3465 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3466 self.pg1.set_table_ip4(vrf_id1)
3467 self.pg2.set_table_ip4(vrf_id2)
3468 self.pg1.config_ip4()
3469 self.pg2.config_ip4()
3470 self.pg1.resolve_arp()
3471 self.pg2.resolve_arp()
3473 self.nat44_add_address(self.nat_addr)
3474 flags = self.config_flags.NAT44_EI_IF_INSIDE
3475 self.vapi.nat44_ei_interface_add_del_feature(
3476 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3478 self.vapi.nat44_ei_interface_add_del_feature(
3479 sw_if_index=self.pg1.sw_if_index, is_add=1
3481 self.vapi.nat44_ei_interface_add_del_feature(
3482 sw_if_index=self.pg2.sw_if_index, is_add=1
3487 pkts = self.create_stream_in(self.pg0, self.pg1)
3488 self.pg0.add_stream(pkts)
3489 self.pg_enable_capture(self.pg_interfaces)
3491 capture = self.pg1.get_capture(len(pkts))
3492 self.verify_capture_out(capture, self.nat_addr)
3494 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3495 self.pg1.add_stream(pkts)
3496 self.pg_enable_capture(self.pg_interfaces)
3498 capture = self.pg0.get_capture(len(pkts))
3499 self.verify_capture_in(capture, self.pg0)
3501 self.tcp_port_in = 60303
3502 self.udp_port_in = 60304
3503 self.icmp_id_in = 60305
3506 pkts = self.create_stream_in(self.pg0, self.pg2)
3507 self.pg0.add_stream(pkts)
3508 self.pg_enable_capture(self.pg_interfaces)
3510 capture = self.pg2.get_capture(len(pkts))
3511 self.verify_capture_out(capture, self.nat_addr)
3513 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3514 self.pg2.add_stream(pkts)
3515 self.pg_enable_capture(self.pg_interfaces)
3517 capture = self.pg0.get_capture(len(pkts))
3518 self.verify_capture_in(capture, self.pg0)
3521 self.nat44_add_address(self.nat_addr, is_add=0)
3522 self.pg1.unconfig_ip4()
3523 self.pg2.unconfig_ip4()
3524 self.pg1.set_table_ip4(0)
3525 self.pg2.set_table_ip4(0)
3526 self.pg1.config_ip4()
3527 self.pg2.config_ip4()
3528 self.pg1.resolve_arp()
3529 self.pg2.resolve_arp()
3531 def test_mss_clamping(self):
3532 """NAT44EI TCP MSS clamping"""
3533 self.nat44_add_address(self.nat_addr)
3534 flags = self.config_flags.NAT44_EI_IF_INSIDE
3535 self.vapi.nat44_ei_interface_add_del_feature(
3536 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3538 self.vapi.nat44_ei_interface_add_del_feature(
3539 sw_if_index=self.pg1.sw_if_index, is_add=1
3543 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3544 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3546 sport=self.tcp_port_in,
3547 dport=self.tcp_external_port,
3549 options=[("MSS", 1400)],
3553 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3554 self.pg0.add_stream(p)
3555 self.pg_enable_capture(self.pg_interfaces)
3557 capture = self.pg1.get_capture(1)
3558 # Negotiated MSS value greater than configured - changed
3559 self.verify_mss_value(capture[0], 1000)
3561 self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3562 self.pg0.add_stream(p)
3563 self.pg_enable_capture(self.pg_interfaces)
3565 capture = self.pg1.get_capture(1)
3566 # MSS clamping disabled - negotiated MSS unchanged
3567 self.verify_mss_value(capture[0], 1400)
3569 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3570 self.pg0.add_stream(p)
3571 self.pg_enable_capture(self.pg_interfaces)
3573 capture = self.pg1.get_capture(1)
3574 # Negotiated MSS value smaller than configured - unchanged
3575 self.verify_mss_value(capture[0], 1400)
3577 def test_ha_send(self):
3578 """NAT44EI Send HA session synchronization events (active)"""
3579 flags = self.config_flags.NAT44_EI_IF_INSIDE
3580 self.vapi.nat44_ei_interface_add_del_feature(
3581 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3583 self.vapi.nat44_ei_interface_add_del_feature(
3584 sw_if_index=self.pg1.sw_if_index, is_add=1
3586 self.nat44_add_address(self.nat_addr)
3588 self.vapi.nat44_ei_ha_set_listener(
3589 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3591 self.vapi.nat44_ei_ha_set_failover(
3592 ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3594 bind_layers(UDP, HANATStateSync, sport=12345)
3597 pkts = self.create_stream_in(self.pg0, self.pg1)
3598 self.pg0.add_stream(pkts)
3599 self.pg_enable_capture(self.pg_interfaces)
3601 capture = self.pg1.get_capture(len(pkts))
3602 self.verify_capture_out(capture)
3603 # active send HA events
3604 self.vapi.nat44_ei_ha_flush()
3605 stats = self.statistics["/nat44-ei/ha/add-event-send"]
3606 self.assertEqual(stats[:, 0].sum(), 3)
3607 capture = self.pg3.get_capture(1)
3609 self.assert_packet_checksums_valid(p)
3613 hanat = p[HANATStateSync]
3615 self.logger.error(ppp("Invalid packet:", p))
3618 self.assertEqual(ip.src, self.pg3.local_ip4)
3619 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3620 self.assertEqual(udp.sport, 12345)
3621 self.assertEqual(udp.dport, 12346)
3622 self.assertEqual(hanat.version, 1)
3623 # self.assertEqual(hanat.thread_index, 0)
3624 self.assertEqual(hanat.count, 3)
3625 seq = hanat.sequence_number
3626 for event in hanat.events:
3627 self.assertEqual(event.event_type, 1)
3628 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3629 self.assertEqual(event.out_addr, self.nat_addr)
3630 self.assertEqual(event.fib_index, 0)
3632 # ACK received events
3634 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3635 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3636 / UDP(sport=12346, dport=12345)
3638 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3641 self.pg3.add_stream(ack)
3643 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3644 self.assertEqual(stats[:, 0].sum(), 1)
3646 # delete one session
3647 self.pg_enable_capture(self.pg_interfaces)
3648 self.vapi.nat44_ei_del_session(
3649 address=self.pg0.remote_ip4,
3650 port=self.tcp_port_in,
3651 protocol=IP_PROTOS.tcp,
3652 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3654 self.vapi.nat44_ei_ha_flush()
3655 stats = self.statistics["/nat44-ei/ha/del-event-send"]
3656 self.assertEqual(stats[:, 0].sum(), 1)
3657 capture = self.pg3.get_capture(1)
3660 hanat = p[HANATStateSync]
3662 self.logger.error(ppp("Invalid packet:", p))
3665 self.assertGreater(hanat.sequence_number, seq)
3667 # do not send ACK, active retry send HA event again
3668 self.pg_enable_capture(self.pg_interfaces)
3669 self.virtual_sleep(12)
3670 stats = self.statistics["/nat44-ei/ha/retry-count"]
3671 self.assertEqual(stats[:, 0].sum(), 3)
3672 stats = self.statistics["/nat44-ei/ha/missed-count"]
3673 self.assertEqual(stats[:, 0].sum(), 1)
3674 capture = self.pg3.get_capture(3)
3675 for packet in capture:
3676 self.assertEqual(packet, p)
3678 # session counters refresh
3679 pkts = self.create_stream_out(self.pg1)
3680 self.pg1.add_stream(pkts)
3681 self.pg_enable_capture(self.pg_interfaces)
3683 self.pg0.get_capture(2)
3684 self.vapi.nat44_ei_ha_flush()
3685 stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3686 self.assertEqual(stats[:, 0].sum(), 2)
3687 capture = self.pg3.get_capture(1)
3689 self.assert_packet_checksums_valid(p)
3693 hanat = p[HANATStateSync]
3695 self.logger.error(ppp("Invalid packet:", p))
3698 self.assertEqual(ip.src, self.pg3.local_ip4)
3699 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3700 self.assertEqual(udp.sport, 12345)
3701 self.assertEqual(udp.dport, 12346)
3702 self.assertEqual(hanat.version, 1)
3703 self.assertEqual(hanat.count, 2)
3704 seq = hanat.sequence_number
3705 for event in hanat.events:
3706 self.assertEqual(event.event_type, 3)
3707 self.assertEqual(event.out_addr, self.nat_addr)
3708 self.assertEqual(event.fib_index, 0)
3709 self.assertEqual(event.total_pkts, 2)
3710 self.assertGreater(event.total_bytes, 0)
3712 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3714 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3715 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3716 / UDP(sport=12346, dport=12345)
3718 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3721 self.pg3.add_stream(ack)
3723 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3724 self.assertEqual(stats[:, 0].sum(), 2)
3726 def test_ha_recv(self):
3727 """NAT44EI Receive HA session synchronization events (passive)"""
3728 self.nat44_add_address(self.nat_addr)
3729 flags = self.config_flags.NAT44_EI_IF_INSIDE
3730 self.vapi.nat44_ei_interface_add_del_feature(
3731 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3733 self.vapi.nat44_ei_interface_add_del_feature(
3734 sw_if_index=self.pg1.sw_if_index, is_add=1
3736 self.vapi.nat44_ei_ha_set_listener(
3737 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3739 bind_layers(UDP, HANATStateSync, sport=12345)
3741 # this is a bit tricky - HA dictates thread index due to how it's
3742 # designed, but once we use HA to create a session, we also want
3743 # to pass a packet through said session. so the session must end
3744 # up on the correct thread from both directions - in2out (based on
3745 # IP address) and out2in (based on outside port)
3747 # first choose a thread index which is correct for IP
3748 thread_index = get_nat44_ei_in2out_worker_index(
3749 self.pg0.remote_ip4, self.vpp_worker_count
3752 # now pick a port which is correct for given thread
3753 port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3754 self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3755 self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3756 if self.vpp_worker_count > 0:
3757 self.tcp_port_out += port_per_thread * (thread_index - 1)
3758 self.udp_port_out += port_per_thread * (thread_index - 1)
3760 # send HA session add events to failover/passive
3762 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3763 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3764 / UDP(sport=12346, dport=12345)
3771 in_addr=self.pg0.remote_ip4,
3772 out_addr=self.nat_addr,
3773 in_port=self.tcp_port_in,
3774 out_port=self.tcp_port_out,
3775 eh_addr=self.pg1.remote_ip4,
3776 ehn_addr=self.pg1.remote_ip4,
3777 eh_port=self.tcp_external_port,
3778 ehn_port=self.tcp_external_port,
3784 in_addr=self.pg0.remote_ip4,
3785 out_addr=self.nat_addr,
3786 in_port=self.udp_port_in,
3787 out_port=self.udp_port_out,
3788 eh_addr=self.pg1.remote_ip4,
3789 ehn_addr=self.pg1.remote_ip4,
3790 eh_port=self.udp_external_port,
3791 ehn_port=self.udp_external_port,
3795 thread_index=thread_index,
3799 self.pg3.add_stream(p)
3800 self.pg_enable_capture(self.pg_interfaces)
3803 capture = self.pg3.get_capture(1)
3806 hanat = p[HANATStateSync]
3808 self.logger.error(ppp("Invalid packet:", p))
3811 self.assertEqual(hanat.sequence_number, 1)
3812 self.assertEqual(hanat.flags, "ACK")
3813 self.assertEqual(hanat.version, 1)
3814 self.assertEqual(hanat.thread_index, thread_index)
3815 stats = self.statistics["/nat44-ei/ha/ack-send"]
3816 self.assertEqual(stats[:, 0].sum(), 1)
3817 stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3818 self.assertEqual(stats[:, 0].sum(), 2)
3819 users = self.statistics["/nat44-ei/total-users"]
3820 self.assertEqual(users[:, 0].sum(), 1)
3821 sessions = self.statistics["/nat44-ei/total-sessions"]
3822 self.assertEqual(sessions[:, 0].sum(), 2)
3823 users = self.vapi.nat44_ei_user_dump()
3824 self.assertEqual(len(users), 1)
3825 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3826 # there should be 2 sessions created by HA
3827 sessions = self.vapi.nat44_ei_user_session_dump(
3828 users[0].ip_address, users[0].vrf_id
3830 self.assertEqual(len(sessions), 2)
3831 for session in sessions:
3832 self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3833 self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3834 self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3835 self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3836 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3838 # send HA session delete event to failover/passive
3840 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3841 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3842 / UDP(sport=12346, dport=12345)
3849 in_addr=self.pg0.remote_ip4,
3850 out_addr=self.nat_addr,
3851 in_port=self.udp_port_in,
3852 out_port=self.udp_port_out,
3853 eh_addr=self.pg1.remote_ip4,
3854 ehn_addr=self.pg1.remote_ip4,
3855 eh_port=self.udp_external_port,
3856 ehn_port=self.udp_external_port,
3860 thread_index=thread_index,
3864 self.pg3.add_stream(p)
3865 self.pg_enable_capture(self.pg_interfaces)
3868 capture = self.pg3.get_capture(1)
3871 hanat = p[HANATStateSync]
3873 self.logger.error(ppp("Invalid packet:", p))
3876 self.assertEqual(hanat.sequence_number, 2)
3877 self.assertEqual(hanat.flags, "ACK")
3878 self.assertEqual(hanat.version, 1)
3879 users = self.vapi.nat44_ei_user_dump()
3880 self.assertEqual(len(users), 1)
3881 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3882 # now we should have only 1 session, 1 deleted by HA
3883 sessions = self.vapi.nat44_ei_user_session_dump(
3884 users[0].ip_address, users[0].vrf_id
3886 self.assertEqual(len(sessions), 1)
3887 stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3888 self.assertEqual(stats[:, 0].sum(), 1)
3890 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3891 self.assertEqual(stats, 2)
3893 # send HA session refresh event to failover/passive
3895 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3896 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3897 / UDP(sport=12346, dport=12345)
3902 event_type="refresh",
3904 in_addr=self.pg0.remote_ip4,
3905 out_addr=self.nat_addr,
3906 in_port=self.tcp_port_in,
3907 out_port=self.tcp_port_out,
3908 eh_addr=self.pg1.remote_ip4,
3909 ehn_addr=self.pg1.remote_ip4,
3910 eh_port=self.tcp_external_port,
3911 ehn_port=self.tcp_external_port,
3917 thread_index=thread_index,
3920 self.pg3.add_stream(p)
3921 self.pg_enable_capture(self.pg_interfaces)
3924 capture = self.pg3.get_capture(1)
3927 hanat = p[HANATStateSync]
3929 self.logger.error(ppp("Invalid packet:", p))
3932 self.assertEqual(hanat.sequence_number, 3)
3933 self.assertEqual(hanat.flags, "ACK")
3934 self.assertEqual(hanat.version, 1)
3935 users = self.vapi.nat44_ei_user_dump()
3936 self.assertEqual(len(users), 1)
3937 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3938 sessions = self.vapi.nat44_ei_user_session_dump(
3939 users[0].ip_address, users[0].vrf_id
3941 self.assertEqual(len(sessions), 1)
3942 session = sessions[0]
3943 self.assertEqual(session.total_bytes, 1024)
3944 self.assertEqual(session.total_pkts, 2)
3945 stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3946 self.assertEqual(stats[:, 0].sum(), 1)
3948 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3949 self.assertEqual(stats, 3)
3951 # send packet to test session created by HA
3953 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3954 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3955 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3957 self.pg1.add_stream(p)
3958 self.pg_enable_capture(self.pg_interfaces)
3960 capture = self.pg0.get_capture(1)
3966 self.logger.error(ppp("Invalid packet:", p))
3969 self.assertEqual(ip.src, self.pg1.remote_ip4)
3970 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3971 self.assertEqual(tcp.sport, self.tcp_external_port)
3972 self.assertEqual(tcp.dport, self.tcp_port_in)
3974 def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3975 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3976 self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3977 # keep plugin configuration persistent
3978 self.plugin_enable()
3979 return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3981 def test_set_frame_queue_nelts(self):
3982 """NAT44EI API test - worker handoff frame queue elements"""
3983 self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3985 def show_commands_at_teardown(self):
3986 self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3987 self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3988 self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3989 self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3990 self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3991 self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3992 self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3993 self.logger.info(self.vapi.cli("show nat44 ei ha"))
3994 self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3996 def test_outside_address_distribution(self):
3997 """Outside address distribution based on source address"""
4002 for i in range(1, x):
4004 nat_addresses.append(a)
4006 flags = self.config_flags.NAT44_EI_IF_INSIDE
4007 self.vapi.nat44_ei_interface_add_del_feature(
4008 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4010 self.vapi.nat44_ei_interface_add_del_feature(
4011 sw_if_index=self.pg1.sw_if_index, is_add=1
4014 self.vapi.nat44_ei_add_del_address_range(
4015 first_ip_address=nat_addresses[0],
4016 last_ip_address=nat_addresses[-1],
4021 self.pg0.generate_remote_hosts(x)
4025 info = self.create_packet_info(self.pg0, self.pg1)
4026 payload = self.info_to_payload(info)
4028 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4029 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4030 / UDP(sport=7000 + i, dport=8000 + i)
4036 self.pg0.add_stream(pkts)
4037 self.pg_enable_capture(self.pg_interfaces)
4039 recvd = self.pg1.get_capture(len(pkts))
4040 for p_recvd in recvd:
4041 payload_info = self.payload_to_info(p_recvd[Raw])
4042 packet_index = payload_info.index
4043 info = self._packet_infos[packet_index]
4044 self.assertTrue(info is not None)
4045 self.assertEqual(packet_index, info.index)
4047 packed = socket.inet_aton(p_sent[IP].src)
4048 numeric = struct.unpack("!L", packed)[0]
4049 numeric = socket.htonl(numeric)
4050 a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4054 "Invalid packet (src IP %s translated to %s, but expected %s)"
4055 % (p_sent[IP].src, p_recvd[IP].src, a),
4058 def test_default_user_sessions(self):
4059 """NAT44EI default per-user session limit is used and reported"""
4060 nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4061 # a nonzero default should be reported for user_sessions
4062 self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4064 def test_delete_interface(self):
4065 """NAT44EI delete nat interface"""
4067 self.nat44_add_address(self.nat_addr)
4069 interfaces = self.create_loopback_interfaces(4)
4071 self.vapi.nat44_ei_interface_add_del_feature(
4072 sw_if_index=interfaces[0].sw_if_index, is_add=1
4074 flags = self.config_flags.NAT44_EI_IF_INSIDE
4075 self.vapi.nat44_ei_interface_add_del_feature(
4076 sw_if_index=interfaces[1].sw_if_index, flags=flags, is_add=1
4078 flags |= self.config_flags.NAT44_EI_IF_OUTSIDE
4079 self.vapi.nat44_ei_interface_add_del_feature(
4080 sw_if_index=interfaces[2].sw_if_index, flags=flags, is_add=1
4082 self.vapi.nat44_ei_add_del_output_interface(
4083 sw_if_index=interfaces[3].sw_if_index, is_add=1
4086 nat_sw_if_indices = [
4088 for i in self.vapi.nat44_ei_interface_dump()
4089 + list(self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get))
4091 self.assertEqual(len(nat_sw_if_indices), len(interfaces))
4094 for i in interfaces:
4095 # delete nat-enabled interface
4096 self.assertIn(i.sw_if_index, nat_sw_if_indices)
4097 i.remove_vpp_config()
4099 # create interface with the same index
4100 lo = VppLoInterface(self)
4101 loopbacks.append(lo)
4102 self.assertEqual(lo.sw_if_index, i.sw_if_index)
4104 # check interface is not nat-enabled
4105 nat_sw_if_indices = [
4107 for i in self.vapi.nat44_ei_interface_dump()
4109 self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get)
4112 self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
4115 i.remove_vpp_config()
4118 class TestNAT44Out2InDPO(MethodHolder):
4119 """NAT44EI Test Cases using out2in DPO"""
4122 def setUpClass(cls):
4123 super(TestNAT44Out2InDPO, cls).setUpClass()
4124 cls.vapi.cli("set log class nat44-ei level debug")
4126 cls.tcp_port_in = 6303
4127 cls.tcp_port_out = 6303
4128 cls.udp_port_in = 6304
4129 cls.udp_port_out = 6304
4130 cls.icmp_id_in = 6305
4131 cls.icmp_id_out = 6305
4132 cls.nat_addr = "10.0.0.3"
4133 cls.dst_ip4 = "192.168.70.1"
4135 cls.create_pg_interfaces(range(2))
4138 cls.pg0.config_ip4()
4139 cls.pg0.resolve_arp()
4142 cls.pg1.config_ip6()
4143 cls.pg1.resolve_ndp()
4149 [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4155 super(TestNAT44Out2InDPO, self).setUp()
4156 flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4157 self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4160 super(TestNAT44Out2InDPO, self).tearDown()
4161 if not self.vpp_dead:
4162 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4163 self.vapi.cli("clear logging")
4165 def configure_xlat(self):
4166 self.dst_ip6_pfx = "1:2:3::"
4167 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4168 self.dst_ip6_pfx_len = 96
4169 self.src_ip6_pfx = "4:5:6::"
4170 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4171 self.src_ip6_pfx_len = 96
4172 self.vapi.map_add_domain(
4174 self.dst_ip6_pfx_len,
4176 self.src_ip6_pfx_len,
4181 @unittest.skip("Temporary disabled")
4182 def test_464xlat_ce(self):
4183 """Test 464XLAT CE with NAT44EI"""
4185 self.configure_xlat()
4187 flags = self.config_flags.NAT44_EI_IF_INSIDE
4188 self.vapi.nat44_ei_interface_add_del_feature(
4189 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4191 self.vapi.nat44_ei_add_del_address_range(
4192 first_ip_address=self.nat_addr_n,
4193 last_ip_address=self.nat_addr_n,
4198 out_src_ip6 = self.compose_ip6(
4199 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4201 out_dst_ip6 = self.compose_ip6(
4202 self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4206 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4207 self.pg0.add_stream(pkts)
4208 self.pg_enable_capture(self.pg_interfaces)
4210 capture = self.pg1.get_capture(len(pkts))
4211 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4213 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4214 self.pg1.add_stream(pkts)
4215 self.pg_enable_capture(self.pg_interfaces)
4217 capture = self.pg0.get_capture(len(pkts))
4218 self.verify_capture_in(capture, self.pg0)
4220 self.vapi.nat44_ei_interface_add_del_feature(
4221 sw_if_index=self.pg0.sw_if_index, flags=flags
4223 self.vapi.nat44_ei_add_del_address_range(
4224 first_ip_address=self.nat_addr_n,
4225 last_ip_address=self.nat_addr_n,
4229 @unittest.skip("Temporary disabled")
4230 def test_464xlat_ce_no_nat(self):
4231 """Test 464XLAT CE without NAT44EI"""
4233 self.configure_xlat()
4235 out_src_ip6 = self.compose_ip6(
4236 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4238 out_dst_ip6 = self.compose_ip6(
4239 self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4242 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4243 self.pg0.add_stream(pkts)
4244 self.pg_enable_capture(self.pg_interfaces)
4246 capture = self.pg1.get_capture(len(pkts))
4247 self.verify_capture_out_ip6(
4248 capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4251 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4252 self.pg1.add_stream(pkts)
4253 self.pg_enable_capture(self.pg_interfaces)
4255 capture = self.pg0.get_capture(len(pkts))
4256 self.verify_capture_in(capture, self.pg0)
4259 class TestNAT44EIMW(MethodHolder):
4260 """NAT44EI Test Cases (multiple workers)"""
4262 vpp_worker_count = 2
4263 max_translations = 10240
4267 def setUpClass(cls):
4268 super(TestNAT44EIMW, cls).setUpClass()
4269 cls.vapi.cli("set log class nat level debug")
4271 cls.tcp_port_in = 6303
4272 cls.tcp_port_out = 6303
4273 cls.udp_port_in = 6304
4274 cls.udp_port_out = 6304
4275 cls.icmp_id_in = 6305
4276 cls.icmp_id_out = 6305
4277 cls.nat_addr = "10.0.0.3"
4278 cls.ipfix_src_port = 4739
4279 cls.ipfix_domain_id = 1
4280 cls.tcp_external_port = 80
4281 cls.udp_external_port = 69
4283 cls.create_pg_interfaces(range(10))
4284 cls.interfaces = list(cls.pg_interfaces[0:4])
4286 for i in cls.interfaces:
4291 cls.pg0.generate_remote_hosts(3)
4292 cls.pg0.configure_ipv4_neighbors()
4294 cls.pg1.generate_remote_hosts(1)
4295 cls.pg1.configure_ipv4_neighbors()
4297 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4298 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4299 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4301 cls.pg4._local_ip4 = "172.16.255.1"
4302 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4303 cls.pg4.set_table_ip4(10)
4304 cls.pg5._local_ip4 = "172.17.255.3"
4305 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4306 cls.pg5.set_table_ip4(10)
4307 cls.pg6._local_ip4 = "172.16.255.1"
4308 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4309 cls.pg6.set_table_ip4(20)
4310 for i in cls.overlapping_interfaces:
4318 cls.pg9.generate_remote_hosts(2)
4319 cls.pg9.config_ip4()
4320 cls.vapi.sw_interface_add_del_address(
4321 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4325 cls.pg9.resolve_arp()
4326 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4327 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4328 cls.pg9.resolve_arp()
4331 super(TestNAT44EIMW, self).setUp()
4332 self.vapi.nat44_ei_plugin_enable_disable(
4333 sessions=self.max_translations, users=self.max_users, enable=1
4337 super(TestNAT44EIMW, self).tearDown()
4338 if not self.vpp_dead:
4339 self.vapi.nat44_ei_ipfix_enable_disable(
4340 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4342 self.ipfix_src_port = 4739
4343 self.ipfix_domain_id = 1
4345 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4346 self.vapi.cli("clear logging")
4348 def test_hairpinning(self):
4349 """NAT44EI hairpinning - 1:1 NAPT"""
4351 host = self.pg0.remote_hosts[0]
4352 server = self.pg0.remote_hosts[1]
4355 server_in_port = 5678
4356 server_out_port = 8765
4360 self.nat44_add_address(self.nat_addr)
4361 flags = self.config_flags.NAT44_EI_IF_INSIDE
4362 self.vapi.nat44_ei_interface_add_del_feature(
4363 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4365 self.vapi.nat44_ei_interface_add_del_feature(
4366 sw_if_index=self.pg1.sw_if_index, is_add=1
4369 # add static mapping for server
4370 self.nat44_add_static_mapping(
4375 proto=IP_PROTOS.tcp,
4378 cnt = self.statistics["/nat44-ei/hairpinning"]
4379 # send packet from host to server
4381 Ether(src=host.mac, dst=self.pg0.local_mac)
4382 / IP(src=host.ip4, dst=self.nat_addr)
4383 / TCP(sport=host_in_port, dport=server_out_port)
4385 self.pg0.add_stream(p)
4386 self.pg_enable_capture(self.pg_interfaces)
4388 capture = self.pg0.get_capture(1)
4393 self.assertEqual(ip.src, self.nat_addr)
4394 self.assertEqual(ip.dst, server.ip4)
4395 self.assertNotEqual(tcp.sport, host_in_port)
4396 self.assertEqual(tcp.dport, server_in_port)
4397 self.assert_packet_checksums_valid(p)
4398 host_out_port = tcp.sport
4400 self.logger.error(ppp("Unexpected or invalid packet:", p))
4403 after = self.statistics["/nat44-ei/hairpinning"]
4405 if_idx = self.pg0.sw_if_index
4406 self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4408 # send reply from server to host
4410 Ether(src=server.mac, dst=self.pg0.local_mac)
4411 / IP(src=server.ip4, dst=self.nat_addr)
4412 / TCP(sport=server_in_port, dport=host_out_port)
4414 self.pg0.add_stream(p)
4415 self.pg_enable_capture(self.pg_interfaces)
4417 capture = self.pg0.get_capture(1)
4422 self.assertEqual(ip.src, self.nat_addr)
4423 self.assertEqual(ip.dst, host.ip4)
4424 self.assertEqual(tcp.sport, server_out_port)
4425 self.assertEqual(tcp.dport, host_in_port)
4426 self.assert_packet_checksums_valid(p)
4428 self.logger.error(ppp("Unexpected or invalid packet:", p))
4431 after = self.statistics["/nat44-ei/hairpinning"]
4432 if_idx = self.pg0.sw_if_index
4433 self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4434 self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4436 def test_hairpinning2(self):
4437 """NAT44EI hairpinning - 1:1 NAT"""
4439 server1_nat_ip = "10.0.0.10"
4440 server2_nat_ip = "10.0.0.11"
4441 host = self.pg0.remote_hosts[0]
4442 server1 = self.pg0.remote_hosts[1]
4443 server2 = self.pg0.remote_hosts[2]
4444 server_tcp_port = 22
4445 server_udp_port = 20
4447 self.nat44_add_address(self.nat_addr)
4448 flags = self.config_flags.NAT44_EI_IF_INSIDE
4449 self.vapi.nat44_ei_interface_add_del_feature(
4450 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4452 self.vapi.nat44_ei_interface_add_del_feature(
4453 sw_if_index=self.pg1.sw_if_index, is_add=1
4456 # add static mapping for servers
4457 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4458 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4463 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4464 / IP(src=host.ip4, dst=server1_nat_ip)
4465 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4469 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4470 / IP(src=host.ip4, dst=server1_nat_ip)
4471 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4475 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4476 / IP(src=host.ip4, dst=server1_nat_ip)
4477 / ICMP(id=self.icmp_id_in, type="echo-request")
4480 self.pg0.add_stream(pkts)
4481 self.pg_enable_capture(self.pg_interfaces)
4483 capture = self.pg0.get_capture(len(pkts))
4484 for packet in capture:
4486 self.assertEqual(packet[IP].src, self.nat_addr)
4487 self.assertEqual(packet[IP].dst, server1.ip4)
4488 if packet.haslayer(TCP):
4489 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4490 self.assertEqual(packet[TCP].dport, server_tcp_port)
4491 self.tcp_port_out = packet[TCP].sport
4492 self.assert_packet_checksums_valid(packet)
4493 elif packet.haslayer(UDP):
4494 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4495 self.assertEqual(packet[UDP].dport, server_udp_port)
4496 self.udp_port_out = packet[UDP].sport
4498 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4499 self.icmp_id_out = packet[ICMP].id
4501 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4507 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4508 / IP(src=server1.ip4, dst=self.nat_addr)
4509 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4513 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4514 / IP(src=server1.ip4, dst=self.nat_addr)
4515 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4519 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4520 / IP(src=server1.ip4, dst=self.nat_addr)
4521 / ICMP(id=self.icmp_id_out, type="echo-reply")
4524 self.pg0.add_stream(pkts)
4525 self.pg_enable_capture(self.pg_interfaces)
4527 capture = self.pg0.get_capture(len(pkts))
4528 for packet in capture:
4530 self.assertEqual(packet[IP].src, server1_nat_ip)
4531 self.assertEqual(packet[IP].dst, host.ip4)
4532 if packet.haslayer(TCP):
4533 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4534 self.assertEqual(packet[TCP].sport, server_tcp_port)
4535 self.assert_packet_checksums_valid(packet)
4536 elif packet.haslayer(UDP):
4537 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4538 self.assertEqual(packet[UDP].sport, server_udp_port)
4540 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4542 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4545 # server2 to server1
4548 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4549 / IP(src=server2.ip4, dst=server1_nat_ip)
4550 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4554 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4555 / IP(src=server2.ip4, dst=server1_nat_ip)
4556 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4560 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4561 / IP(src=server2.ip4, dst=server1_nat_ip)
4562 / ICMP(id=self.icmp_id_in, type="echo-request")
4565 self.pg0.add_stream(pkts)
4566 self.pg_enable_capture(self.pg_interfaces)
4568 capture = self.pg0.get_capture(len(pkts))
4569 for packet in capture:
4571 self.assertEqual(packet[IP].src, server2_nat_ip)
4572 self.assertEqual(packet[IP].dst, server1.ip4)
4573 if packet.haslayer(TCP):
4574 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4575 self.assertEqual(packet[TCP].dport, server_tcp_port)
4576 self.tcp_port_out = packet[TCP].sport
4577 self.assert_packet_checksums_valid(packet)
4578 elif packet.haslayer(UDP):
4579 self.assertEqual(packet[UDP].sport, self.udp_port_in)
4580 self.assertEqual(packet[UDP].dport, server_udp_port)
4581 self.udp_port_out = packet[UDP].sport
4583 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4584 self.icmp_id_out = packet[ICMP].id
4586 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4589 # server1 to server2
4592 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4593 / IP(src=server1.ip4, dst=server2_nat_ip)
4594 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4598 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4599 / IP(src=server1.ip4, dst=server2_nat_ip)
4600 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4604 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4605 / IP(src=server1.ip4, dst=server2_nat_ip)
4606 / ICMP(id=self.icmp_id_out, type="echo-reply")
4609 self.pg0.add_stream(pkts)
4610 self.pg_enable_capture(self.pg_interfaces)
4612 capture = self.pg0.get_capture(len(pkts))
4613 for packet in capture:
4615 self.assertEqual(packet[IP].src, server1_nat_ip)
4616 self.assertEqual(packet[IP].dst, server2.ip4)
4617 if packet.haslayer(TCP):
4618 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4619 self.assertEqual(packet[TCP].sport, server_tcp_port)
4620 self.assert_packet_checksums_valid(packet)
4621 elif packet.haslayer(UDP):
4622 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4623 self.assertEqual(packet[UDP].sport, server_udp_port)
4625 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4627 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4631 if __name__ == "__main__":
4632 unittest.main(testRunner=VppTestRunner)