11 from framework import VppTestCase, VppLoInterface
12 from asfframework import VppTestRunner, tag_fixme_debian11, is_distro_debian11
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import (
27 from scapy.data import IP_PROTOS
28 from scapy.layers.inet import IP, TCP, UDP, ICMP
29 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
30 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
31 from scapy.layers.l2 import Ether, ARP, GRE
32 from scapy.packet import Raw
33 from syslog_rfc5424_parser import SyslogMessage, ParseError
34 from syslog_rfc5424_parser.constants import SyslogSeverity
36 from vpp_ip_route import VppIpRoute, VppRoutePath
37 from vpp_neighbor import VppNeighbor
38 from vpp_papi import VppEnum
41 # NAT HA protocol event data
45 ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
46 ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
47 ShortField("flags", 0),
48 IPField("in_addr", None),
49 IPField("out_addr", None),
50 ShortField("in_port", None),
51 ShortField("out_port", None),
52 IPField("eh_addr", None),
53 IPField("ehn_addr", None),
54 ShortField("eh_port", None),
55 ShortField("ehn_port", None),
56 IntField("fib_index", None),
57 IntField("total_pkts", 0),
58 LongField("total_bytes", 0),
61 def extract_padding(self, s):
65 # NAT HA protocol header
66 class HANATStateSync(Packet):
67 name = "HA NAT state sync"
69 XByteField("version", 1),
70 FlagsField("flags", 0, 8, ["ACK"]),
71 FieldLenField("count", None, count_of="events"),
72 IntField("sequence_number", 1),
73 IntField("thread_index", 0),
74 PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
78 class MethodHolder(VppTestCase):
79 """NAT create capture and verify method holder"""
82 def config_flags(self):
83 return VppEnum.vl_api_nat44_ei_config_flags_t
86 def SYSLOG_SEVERITY(self):
87 return VppEnum.vl_api_syslog_severity_t
89 def nat44_add_static_mapping(
92 external_ip="0.0.0.0",
97 external_sw_if_index=0xFFFFFFFF,
103 Add/delete NAT44EI static mapping
105 :param local_ip: Local IP address
106 :param external_ip: External IP address
107 :param local_port: Local port number (Optional)
108 :param external_port: External port number (Optional)
109 :param vrf_id: VRF ID (Default 0)
110 :param is_add: 1 if add, 0 if delete (Default add)
111 :param external_sw_if_index: External interface instead of IP address
112 :param proto: IP protocol (Mandatory if port specified)
113 :param tag: Opaque string tag
114 :param flags: NAT configuration flags
117 if not (local_port and external_port):
118 flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
120 self.vapi.nat44_ei_add_del_static_mapping(
122 local_ip_address=local_ip,
123 external_ip_address=external_ip,
124 external_sw_if_index=external_sw_if_index,
125 local_port=local_port,
126 external_port=external_port,
133 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
135 Add/delete NAT44EI address
137 :param ip: IP address
138 :param is_add: 1 if add, 0 if delete (Default add)
140 self.vapi.nat44_ei_add_del_address_range(
141 first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
144 def create_routes_and_neigbors(self):
149 [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
155 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
162 self.pg7.sw_if_index,
169 self.pg8.sw_if_index,
177 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
179 Create packet stream for inside network
181 :param in_if: Inside interface
182 :param out_if: Outside interface
183 :param dst_ip: Destination address
184 :param ttl: TTL of generated packets
187 dst_ip = out_if.remote_ip4
192 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
193 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
194 / TCP(sport=self.tcp_port_in, dport=20)
200 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
201 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
202 / UDP(sport=self.udp_port_in, dport=20)
208 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
209 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
210 / ICMP(id=self.icmp_id_in, type="echo-request")
216 def compose_ip6(self, ip4, pref, plen):
218 Compose IPv4-embedded IPv6 addresses
220 :param ip4: IPv4 address
221 :param pref: IPv6 prefix
222 :param plen: IPv6 prefix length
223 :returns: IPv4-embedded IPv6 addresses
225 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
226 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
241 pref_n[10] = ip4_n[3]
245 pref_n[10] = ip4_n[2]
246 pref_n[11] = ip4_n[3]
249 pref_n[10] = ip4_n[1]
250 pref_n[11] = ip4_n[2]
251 pref_n[12] = ip4_n[3]
253 pref_n[12] = ip4_n[0]
254 pref_n[13] = ip4_n[1]
255 pref_n[14] = ip4_n[2]
256 pref_n[15] = ip4_n[3]
257 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
258 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
260 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
262 Create packet stream for outside network
264 :param out_if: Outside interface
265 :param dst_ip: Destination IP address (Default use global NAT address)
266 :param ttl: TTL of generated packets
267 :param use_inside_ports: Use inside NAT ports as destination ports
268 instead of outside ports
271 dst_ip = self.nat_addr
272 if not use_inside_ports:
273 tcp_port = self.tcp_port_out
274 udp_port = self.udp_port_out
275 icmp_id = self.icmp_id_out
277 tcp_port = self.tcp_port_in
278 udp_port = self.udp_port_in
279 icmp_id = self.icmp_id_in
283 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
284 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
285 / TCP(dport=tcp_port, sport=20)
291 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
292 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
293 / UDP(dport=udp_port, sport=20)
299 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
300 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
301 / ICMP(id=icmp_id, type="echo-reply")
307 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
309 Create packet stream for outside network
311 :param out_if: Outside interface
312 :param dst_ip: Destination IP address (Default use global NAT address)
313 :param hl: HL of generated packets
318 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
319 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
320 / TCP(dport=self.tcp_port_out, sport=20)
326 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
327 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
328 / UDP(dport=self.udp_port_out, sport=20)
334 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
335 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
336 / ICMPv6EchoReply(id=self.icmp_id_out)
342 def verify_capture_out(
352 Verify captured packets on outside network
354 :param capture: Captured packets
355 :param nat_ip: Translated IP address (Default use global NAT address)
356 :param same_port: Source port number is not translated (Default False)
357 :param dst_ip: Destination IP address (Default do not verify)
358 :param is_ip6: If L3 protocol is IPv6 (Default False)
362 ICMP46 = ICMPv6EchoRequest
367 nat_ip = self.nat_addr
368 for packet in capture:
371 self.assert_packet_checksums_valid(packet)
372 self.assertEqual(packet[IP46].src, nat_ip)
373 if dst_ip is not None:
374 self.assertEqual(packet[IP46].dst, dst_ip)
375 if packet.haslayer(TCP):
378 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
380 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
381 self.tcp_port_out = packet[TCP].sport
382 self.assert_packet_checksums_valid(packet)
383 elif packet.haslayer(UDP):
386 self.assertEqual(packet[UDP].sport, self.udp_port_in)
388 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
389 self.udp_port_out = packet[UDP].sport
393 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
395 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
396 self.icmp_id_out = packet[ICMP46].id
397 self.assert_packet_checksums_valid(packet)
400 ppp("Unexpected or invalid packet (outside network):", packet)
404 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
406 Verify captured packets on outside network
408 :param capture: Captured packets
409 :param nat_ip: Translated IP address
410 :param same_port: Source port number is not translated (Default False)
411 :param dst_ip: Destination IP address (Default do not verify)
413 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
415 def verify_capture_in(self, capture, in_if):
417 Verify captured packets on inside network
419 :param capture: Captured packets
420 :param in_if: Inside interface
422 for packet in capture:
424 self.assert_packet_checksums_valid(packet)
425 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
426 if packet.haslayer(TCP):
427 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
428 elif packet.haslayer(UDP):
429 self.assertEqual(packet[UDP].dport, self.udp_port_in)
431 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
434 ppp("Unexpected or invalid packet (inside network):", packet)
438 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
440 Verify captured packet that don't have to be translated
442 :param capture: Captured packets
443 :param ingress_if: Ingress interface
444 :param egress_if: Egress interface
446 for packet in capture:
448 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
449 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
450 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
452 elif packet.haslayer(UDP):
453 self.assertEqual(packet[UDP].sport, self.udp_port_in)
455 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
458 ppp("Unexpected or invalid packet (inside network):", packet)
462 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
464 Verify captured packets with ICMP errors on outside network
466 :param capture: Captured packets
467 :param src_ip: Translated IP address or IP address of VPP
468 (Default use global NAT address)
469 :param icmp_type: Type of error ICMP packet
470 we are expecting (Default 11)
473 src_ip = self.nat_addr
474 for packet in capture:
476 self.assertEqual(packet[IP].src, src_ip)
477 self.assertEqual(packet.haslayer(ICMP), 1)
479 self.assertEqual(icmp.type, icmp_type)
480 self.assertTrue(icmp.haslayer(IPerror))
481 inner_ip = icmp[IPerror]
482 if inner_ip.haslayer(TCPerror):
483 self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
484 elif inner_ip.haslayer(UDPerror):
485 self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
487 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
490 ppp("Unexpected or invalid packet (outside network):", packet)
494 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
496 Verify captured packets with ICMP errors on inside network
498 :param capture: Captured packets
499 :param in_if: Inside interface
500 :param icmp_type: Type of error ICMP packet
501 we are expecting (Default 11)
503 for packet in capture:
505 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
506 self.assertEqual(packet.haslayer(ICMP), 1)
508 self.assertEqual(icmp.type, icmp_type)
509 self.assertTrue(icmp.haslayer(IPerror))
510 inner_ip = icmp[IPerror]
511 if inner_ip.haslayer(TCPerror):
512 self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
513 elif inner_ip.haslayer(UDPerror):
514 self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
516 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
519 ppp("Unexpected or invalid packet (inside network):", packet)
523 def create_stream_frag(
524 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
527 Create fragmented packet stream
529 :param src_if: Source interface
530 :param dst: Destination IPv4 address
531 :param sport: Source port
532 :param dport: Destination port
533 :param data: Payload data
534 :param proto: protocol (TCP, UDP, ICMP)
535 :param echo_reply: use echo_reply if protocol is ICMP
538 if proto == IP_PROTOS.tcp:
540 IP(src=src_if.remote_ip4, dst=dst)
541 / TCP(sport=sport, dport=dport)
544 p = p.__class__(scapy.compat.raw(p))
545 chksum = p[TCP].chksum
546 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
547 elif proto == IP_PROTOS.udp:
548 proto_header = UDP(sport=sport, dport=dport)
549 elif proto == IP_PROTOS.icmp:
551 proto_header = ICMP(id=sport, type="echo-request")
553 proto_header = ICMP(id=sport, type="echo-reply")
555 raise Exception("Unsupported protocol")
556 id = random.randint(0, 65535)
558 if proto == IP_PROTOS.tcp:
561 raw = Raw(data[0:16])
563 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
564 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
569 if proto == IP_PROTOS.tcp:
570 raw = Raw(data[4:20])
572 raw = Raw(data[16:32])
574 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
575 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
579 if proto == IP_PROTOS.tcp:
584 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
585 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
591 def reass_frags_and_verify(self, frags, src, dst):
593 Reassemble and verify fragmented packet
595 :param frags: Captured fragments
596 :param src: Source IPv4 address to verify
597 :param dst: Destination IPv4 address to verify
599 :returns: Reassembled IPv4 packet
603 self.assertEqual(p[IP].src, src)
604 self.assertEqual(p[IP].dst, dst)
605 self.assert_ip_checksum_valid(p)
606 buffer.seek(p[IP].frag * 8)
607 buffer.write(bytes(p[IP].payload))
608 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
609 if ip.proto == IP_PROTOS.tcp:
610 p = ip / TCP(buffer.getvalue())
611 self.logger.debug(ppp("Reassembled:", p))
612 self.assert_tcp_checksum_valid(p)
613 elif ip.proto == IP_PROTOS.udp:
614 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
615 elif ip.proto == IP_PROTOS.icmp:
616 p = ip / ICMP(buffer.getvalue())
619 def verify_ipfix_nat44_ses(self, data):
621 Verify IPFIX NAT44EI session create/delete event
623 :param data: Decoded IPFIX data records
625 nat44_ses_create_num = 0
626 nat44_ses_delete_num = 0
627 self.assertEqual(6, len(data))
630 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
631 if scapy.compat.orb(record[230]) == 4:
632 nat44_ses_create_num += 1
634 nat44_ses_delete_num += 1
636 self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
637 # postNATSourceIPv4Address
639 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
642 self.assertEqual(struct.pack("!I", 0), record[234])
643 # protocolIdentifier/sourceTransportPort
644 # /postNAPTSourceTransportPort
645 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
646 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
647 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
648 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
649 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
650 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
651 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
652 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
653 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
655 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
656 self.assertEqual(3, nat44_ses_create_num)
657 self.assertEqual(3, nat44_ses_delete_num)
659 def verify_ipfix_addr_exhausted(self, data):
660 self.assertEqual(1, len(data))
663 self.assertEqual(scapy.compat.orb(record[230]), 3)
665 self.assertEqual(struct.pack("!I", 0), record[283])
668 def verify_ipfix_max_sessions(self, data, limit):
669 self.assertEqual(1, len(data))
672 self.assertEqual(scapy.compat.orb(record[230]), 13)
673 # natQuotaExceededEvent
674 self.assertEqual(struct.pack("!I", 1), record[466])
676 self.assertEqual(struct.pack("!I", limit), record[471])
679 def verify_no_nat44_user(self):
680 """Verify that there is no NAT44EI user"""
681 users = self.vapi.nat44_ei_user_dump()
682 self.assertEqual(len(users), 0)
683 users = self.statistics["/nat44-ei/total-users"]
684 self.assertEqual(users[0][0], 0)
685 sessions = self.statistics["/nat44-ei/total-sessions"]
686 self.assertEqual(sessions[0][0], 0)
688 def verify_syslog_apmap(self, data, is_add=True):
689 message = data.decode("utf-8")
691 message = SyslogMessage.parse(message)
692 except ParseError as e:
696 self.assertEqual(message.severity, SyslogSeverity.info)
697 self.assertEqual(message.appname, "NAT")
698 self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
699 sd_params = message.sd.get("napmap")
700 self.assertTrue(sd_params is not None)
701 self.assertEqual(sd_params.get("IATYP"), "IPv4")
702 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
703 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
704 self.assertEqual(sd_params.get("XATYP"), "IPv4")
705 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
706 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
707 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
708 self.assertTrue(sd_params.get("SSUBIX") is not None)
709 self.assertEqual(sd_params.get("SVLAN"), "0")
711 def verify_mss_value(self, pkt, mss):
712 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
713 raise TypeError("Not a TCP/IP packet")
715 for option in pkt[TCP].options:
716 if option[0] == "MSS":
717 self.assertEqual(option[1], mss)
718 self.assert_tcp_checksum_valid(pkt)
721 def proto2layer(proto):
722 if proto == IP_PROTOS.tcp:
724 elif proto == IP_PROTOS.udp:
726 elif proto == IP_PROTOS.icmp:
729 raise Exception("Unsupported protocol")
732 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
734 layer = self.proto2layer(proto)
736 if proto == IP_PROTOS.tcp:
737 data = b"A" * 4 + b"B" * 16 + b"C" * 3
739 data = b"A" * 16 + b"B" * 16 + b"C" * 3
740 self.port_in = random.randint(1025, 65535)
743 pkts = self.create_stream_frag(
744 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
746 self.pg0.add_stream(pkts)
747 self.pg_enable_capture(self.pg_interfaces)
749 frags = self.pg1.get_capture(len(pkts))
750 if not dont_translate:
751 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
753 p = self.reass_frags_and_verify(
754 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
756 if proto != IP_PROTOS.icmp:
757 if not dont_translate:
758 self.assertEqual(p[layer].dport, 20)
760 self.assertNotEqual(p[layer].sport, self.port_in)
762 self.assertEqual(p[layer].sport, self.port_in)
765 if not dont_translate:
766 self.assertNotEqual(p[layer].id, self.port_in)
768 self.assertEqual(p[layer].id, self.port_in)
769 self.assertEqual(data, p[Raw].load)
772 if not dont_translate:
773 dst_addr = self.nat_addr
775 dst_addr = self.pg0.remote_ip4
776 if proto != IP_PROTOS.icmp:
778 dport = p[layer].sport
782 pkts = self.create_stream_frag(
783 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
785 self.pg1.add_stream(pkts)
786 self.pg_enable_capture(self.pg_interfaces)
788 frags = self.pg0.get_capture(len(pkts))
789 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
790 if proto != IP_PROTOS.icmp:
791 self.assertEqual(p[layer].sport, 20)
792 self.assertEqual(p[layer].dport, self.port_in)
794 self.assertEqual(p[layer].id, self.port_in)
795 self.assertEqual(data, p[Raw].load)
797 def reass_hairpinning(
806 layer = self.proto2layer(proto)
808 if proto == IP_PROTOS.tcp:
809 data = b"A" * 4 + b"B" * 16 + b"C" * 3
811 data = b"A" * 16 + b"B" * 16 + b"C" * 3
813 # send packet from host to server
814 pkts = self.create_stream_frag(
815 self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
817 self.pg0.add_stream(pkts)
818 self.pg_enable_capture(self.pg_interfaces)
820 frags = self.pg0.get_capture(len(pkts))
821 p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
822 if proto != IP_PROTOS.icmp:
824 self.assertNotEqual(p[layer].sport, host_in_port)
825 self.assertEqual(p[layer].dport, server_in_port)
828 self.assertNotEqual(p[layer].id, host_in_port)
829 self.assertEqual(data, p[Raw].load)
831 def frag_out_of_order(
832 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
834 layer = self.proto2layer(proto)
836 if proto == IP_PROTOS.tcp:
837 data = b"A" * 4 + b"B" * 16 + b"C" * 3
839 data = b"A" * 16 + b"B" * 16 + b"C" * 3
840 self.port_in = random.randint(1025, 65535)
844 pkts = self.create_stream_frag(
845 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
848 self.pg0.add_stream(pkts)
849 self.pg_enable_capture(self.pg_interfaces)
851 frags = self.pg1.get_capture(len(pkts))
852 if not dont_translate:
853 p = self.reass_frags_and_verify(
854 frags, self.nat_addr, self.pg1.remote_ip4
857 p = self.reass_frags_and_verify(
858 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
860 if proto != IP_PROTOS.icmp:
861 if not dont_translate:
862 self.assertEqual(p[layer].dport, 20)
864 self.assertNotEqual(p[layer].sport, self.port_in)
866 self.assertEqual(p[layer].sport, self.port_in)
869 if not dont_translate:
870 self.assertNotEqual(p[layer].id, self.port_in)
872 self.assertEqual(p[layer].id, self.port_in)
873 self.assertEqual(data, p[Raw].load)
876 if not dont_translate:
877 dst_addr = self.nat_addr
879 dst_addr = self.pg0.remote_ip4
880 if proto != IP_PROTOS.icmp:
882 dport = p[layer].sport
886 pkts = self.create_stream_frag(
887 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
890 self.pg1.add_stream(pkts)
891 self.pg_enable_capture(self.pg_interfaces)
893 frags = self.pg0.get_capture(len(pkts))
894 p = self.reass_frags_and_verify(
895 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
897 if proto != IP_PROTOS.icmp:
898 self.assertEqual(p[layer].sport, 20)
899 self.assertEqual(p[layer].dport, self.port_in)
901 self.assertEqual(p[layer].id, self.port_in)
902 self.assertEqual(data, p[Raw].load)
905 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
906 if 0 == vpp_worker_count:
908 numeric = socket.inet_aton(ip)
909 numeric = struct.unpack("!L", numeric)[0]
910 numeric = socket.htonl(numeric)
911 h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
912 return 1 + h % vpp_worker_count
916 class TestNAT44EI(MethodHolder):
917 """NAT44EI Test Cases"""
919 max_translations = 10240
924 super(TestNAT44EI, cls).setUpClass()
925 if is_distro_debian11 == True and not hasattr(cls, "vpp"):
927 cls.vapi.cli("set log class nat44-ei level debug")
929 cls.tcp_port_in = 6303
930 cls.tcp_port_out = 6303
931 cls.udp_port_in = 6304
932 cls.udp_port_out = 6304
933 cls.icmp_id_in = 6305
934 cls.icmp_id_out = 6305
935 cls.nat_addr = "10.0.0.3"
936 cls.ipfix_src_port = 4739
937 cls.ipfix_domain_id = 1
938 cls.tcp_external_port = 80
939 cls.udp_external_port = 69
941 cls.create_pg_interfaces(range(10))
942 cls.interfaces = list(cls.pg_interfaces[0:4])
944 for i in cls.interfaces:
949 cls.pg0.generate_remote_hosts(3)
950 cls.pg0.configure_ipv4_neighbors()
952 cls.pg1.generate_remote_hosts(1)
953 cls.pg1.configure_ipv4_neighbors()
955 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
956 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
957 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
959 cls.pg4._local_ip4 = "172.16.255.1"
960 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
961 cls.pg4.set_table_ip4(10)
962 cls.pg5._local_ip4 = "172.17.255.3"
963 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
964 cls.pg5.set_table_ip4(10)
965 cls.pg6._local_ip4 = "172.16.255.1"
966 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
967 cls.pg6.set_table_ip4(20)
968 for i in cls.overlapping_interfaces:
976 cls.pg9.generate_remote_hosts(2)
978 cls.vapi.sw_interface_add_del_address(
979 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
983 cls.pg9.resolve_arp()
984 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
985 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
986 cls.pg9.resolve_arp()
988 def plugin_enable(self):
989 self.vapi.nat44_ei_plugin_enable_disable(
990 sessions=self.max_translations, users=self.max_users, enable=1
994 super(TestNAT44EI, self).setUp()
998 super(TestNAT44EI, self).tearDown()
999 if not self.vpp_dead:
1000 self.vapi.nat44_ei_ipfix_enable_disable(
1001 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
1003 self.ipfix_src_port = 4739
1004 self.ipfix_domain_id = 1
1006 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1007 self.vapi.cli("clear logging")
1009 def test_clear_sessions(self):
1010 """NAT44EI session clearing test"""
1012 self.nat44_add_address(self.nat_addr)
1013 flags = self.config_flags.NAT44_EI_IF_INSIDE
1014 self.vapi.nat44_ei_interface_add_del_feature(
1015 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1017 self.vapi.nat44_ei_interface_add_del_feature(
1018 sw_if_index=self.pg1.sw_if_index, is_add=1
1021 pkts = self.create_stream_in(self.pg0, self.pg1)
1022 self.pg0.add_stream(pkts)
1023 self.pg_enable_capture(self.pg_interfaces)
1025 capture = self.pg1.get_capture(len(pkts))
1026 self.verify_capture_out(capture)
1028 sessions = self.statistics["/nat44-ei/total-sessions"]
1029 self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1030 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1032 self.vapi.cli("clear nat44 ei sessions")
1034 sessions = self.statistics["/nat44-ei/total-sessions"]
1035 self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1036 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1038 def test_dynamic(self):
1039 """NAT44EI dynamic translation test"""
1040 self.nat44_add_address(self.nat_addr)
1041 flags = self.config_flags.NAT44_EI_IF_INSIDE
1042 self.vapi.nat44_ei_interface_add_del_feature(
1043 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1045 self.vapi.nat44_ei_interface_add_del_feature(
1046 sw_if_index=self.pg1.sw_if_index, is_add=1
1050 tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1051 udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1052 icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1053 drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1055 pkts = self.create_stream_in(self.pg0, self.pg1)
1056 self.pg0.add_stream(pkts)
1057 self.pg_enable_capture(self.pg_interfaces)
1059 capture = self.pg1.get_capture(len(pkts))
1060 self.verify_capture_out(capture)
1062 if_idx = self.pg0.sw_if_index
1063 cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1064 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1065 cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1066 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1067 cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1068 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1069 cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1070 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1073 tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1074 udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1075 icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1076 drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1078 pkts = self.create_stream_out(self.pg1)
1079 self.pg1.add_stream(pkts)
1080 self.pg_enable_capture(self.pg_interfaces)
1082 capture = self.pg0.get_capture(len(pkts))
1083 self.verify_capture_in(capture, self.pg0)
1085 if_idx = self.pg1.sw_if_index
1086 cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1087 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1088 cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1089 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1090 cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1091 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1092 cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1093 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1095 users = self.statistics["/nat44-ei/total-users"]
1096 self.assertEqual(users[:, 0].sum(), 1)
1097 sessions = self.statistics["/nat44-ei/total-sessions"]
1098 self.assertEqual(sessions[:, 0].sum(), 3)
1100 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1101 """NAT44EI handling of client packets with TTL=1"""
1103 self.nat44_add_address(self.nat_addr)
1104 flags = self.config_flags.NAT44_EI_IF_INSIDE
1105 self.vapi.nat44_ei_interface_add_del_feature(
1106 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1108 self.vapi.nat44_ei_interface_add_del_feature(
1109 sw_if_index=self.pg1.sw_if_index, is_add=1
1112 # Client side - generate traffic
1113 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1114 capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1116 # Client side - verify ICMP type 11 packets
1117 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1119 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1120 """NAT44EI handling of server packets with TTL=1"""
1122 self.nat44_add_address(self.nat_addr)
1123 flags = self.config_flags.NAT44_EI_IF_INSIDE
1124 self.vapi.nat44_ei_interface_add_del_feature(
1125 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1127 self.vapi.nat44_ei_interface_add_del_feature(
1128 sw_if_index=self.pg1.sw_if_index, is_add=1
1131 # Client side - create sessions
1132 pkts = self.create_stream_in(self.pg0, self.pg1)
1133 self.pg0.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1137 # Server side - generate traffic
1138 capture = self.pg1.get_capture(len(pkts))
1139 self.verify_capture_out(capture)
1140 pkts = self.create_stream_out(self.pg1, ttl=1)
1141 capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1143 # Server side - verify ICMP type 11 packets
1144 self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1146 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1147 """NAT44EI handling of error responses to client packets with TTL=2"""
1149 self.nat44_add_address(self.nat_addr)
1150 flags = self.config_flags.NAT44_EI_IF_INSIDE
1151 self.vapi.nat44_ei_interface_add_del_feature(
1152 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1154 self.vapi.nat44_ei_interface_add_del_feature(
1155 sw_if_index=self.pg1.sw_if_index, is_add=1
1158 # Client side - generate traffic
1159 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1160 self.pg0.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Server side - simulate ICMP type 11 response
1165 capture = self.pg1.get_capture(len(pkts))
1167 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1168 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1171 for packet in capture
1173 self.pg1.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1177 # Client side - verify ICMP type 11 packets
1178 capture = self.pg0.get_capture(len(pkts))
1179 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1181 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1182 """NAT44EI handling of error responses to server packets with TTL=2"""
1184 self.nat44_add_address(self.nat_addr)
1185 flags = self.config_flags.NAT44_EI_IF_INSIDE
1186 self.vapi.nat44_ei_interface_add_del_feature(
1187 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1189 self.vapi.nat44_ei_interface_add_del_feature(
1190 sw_if_index=self.pg1.sw_if_index, is_add=1
1193 # Client side - create sessions
1194 pkts = self.create_stream_in(self.pg0, self.pg1)
1195 self.pg0.add_stream(pkts)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 # Server side - generate traffic
1200 capture = self.pg1.get_capture(len(pkts))
1201 self.verify_capture_out(capture)
1202 pkts = self.create_stream_out(self.pg1, ttl=2)
1203 self.pg1.add_stream(pkts)
1204 self.pg_enable_capture(self.pg_interfaces)
1207 # Client side - simulate ICMP type 11 response
1208 capture = self.pg0.get_capture(len(pkts))
1210 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1211 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1214 for packet in capture
1216 self.pg0.add_stream(pkts)
1217 self.pg_enable_capture(self.pg_interfaces)
1220 # Server side - verify ICMP type 11 packets
1221 capture = self.pg1.get_capture(len(pkts))
1222 self.verify_capture_out_with_icmp_errors(capture)
1224 def test_ping_out_interface_from_outside(self):
1225 """NAT44EI ping out interface from outside network"""
1227 self.nat44_add_address(self.nat_addr)
1228 flags = self.config_flags.NAT44_EI_IF_INSIDE
1229 self.vapi.nat44_ei_interface_add_del_feature(
1230 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1232 self.vapi.nat44_ei_interface_add_del_feature(
1233 sw_if_index=self.pg1.sw_if_index, is_add=1
1237 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1238 / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1239 / ICMP(id=self.icmp_id_out, type="echo-request")
1242 self.pg1.add_stream(pkts)
1243 self.pg_enable_capture(self.pg_interfaces)
1245 capture = self.pg1.get_capture(len(pkts))
1248 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1249 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1250 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1251 self.assertEqual(packet[ICMP].type, 0) # echo reply
1254 ppp("Unexpected or invalid packet (outside network):", packet)
1258 def test_ping_internal_host_from_outside(self):
1259 """NAT44EI ping internal host from outside network"""
1261 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1262 flags = self.config_flags.NAT44_EI_IF_INSIDE
1263 self.vapi.nat44_ei_interface_add_del_feature(
1264 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1266 self.vapi.nat44_ei_interface_add_del_feature(
1267 sw_if_index=self.pg1.sw_if_index, is_add=1
1272 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1273 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1274 / ICMP(id=self.icmp_id_out, type="echo-request")
1276 self.pg1.add_stream(pkt)
1277 self.pg_enable_capture(self.pg_interfaces)
1279 capture = self.pg0.get_capture(1)
1280 self.verify_capture_in(capture, self.pg0)
1281 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1285 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1286 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1287 / ICMP(id=self.icmp_id_in, type="echo-reply")
1289 self.pg0.add_stream(pkt)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg1.get_capture(1)
1293 self.verify_capture_out(capture, same_port=True)
1294 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1296 def test_forwarding(self):
1297 """NAT44EI forwarding test"""
1299 flags = self.config_flags.NAT44_EI_IF_INSIDE
1300 self.vapi.nat44_ei_interface_add_del_feature(
1301 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1303 self.vapi.nat44_ei_interface_add_del_feature(
1304 sw_if_index=self.pg1.sw_if_index, is_add=1
1306 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1308 real_ip = self.pg0.remote_ip4
1309 alias_ip = self.nat_addr
1310 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1311 self.vapi.nat44_ei_add_del_static_mapping(
1313 local_ip_address=real_ip,
1314 external_ip_address=alias_ip,
1315 external_sw_if_index=0xFFFFFFFF,
1320 # static mapping match
1322 pkts = self.create_stream_out(self.pg1)
1323 self.pg1.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg0.get_capture(len(pkts))
1327 self.verify_capture_in(capture, self.pg0)
1329 pkts = self.create_stream_in(self.pg0, self.pg1)
1330 self.pg0.add_stream(pkts)
1331 self.pg_enable_capture(self.pg_interfaces)
1333 capture = self.pg1.get_capture(len(pkts))
1334 self.verify_capture_out(capture, same_port=True)
1336 # no static mapping match
1338 host0 = self.pg0.remote_hosts[0]
1339 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1341 pkts = self.create_stream_out(
1342 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1344 self.pg1.add_stream(pkts)
1345 self.pg_enable_capture(self.pg_interfaces)
1347 capture = self.pg0.get_capture(len(pkts))
1348 self.verify_capture_in(capture, self.pg0)
1350 pkts = self.create_stream_in(self.pg0, self.pg1)
1351 self.pg0.add_stream(pkts)
1352 self.pg_enable_capture(self.pg_interfaces)
1354 capture = self.pg1.get_capture(len(pkts))
1355 self.verify_capture_out(
1356 capture, nat_ip=self.pg0.remote_ip4, same_port=True
1359 self.pg0.remote_hosts[0] = host0
1362 self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1363 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1364 self.vapi.nat44_ei_add_del_static_mapping(
1366 local_ip_address=real_ip,
1367 external_ip_address=alias_ip,
1368 external_sw_if_index=0xFFFFFFFF,
1372 def test_static_in(self):
1373 """NAT44EI 1:1 NAT initialized from inside network"""
1375 nat_ip = "10.0.0.10"
1376 self.tcp_port_out = 6303
1377 self.udp_port_out = 6304
1378 self.icmp_id_out = 6305
1380 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1381 flags = self.config_flags.NAT44_EI_IF_INSIDE
1382 self.vapi.nat44_ei_interface_add_del_feature(
1383 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1385 self.vapi.nat44_ei_interface_add_del_feature(
1386 sw_if_index=self.pg1.sw_if_index, is_add=1
1388 sm = self.vapi.nat44_ei_static_mapping_dump()
1389 self.assertEqual(len(sm), 1)
1390 self.assertEqual(sm[0].tag, "")
1391 self.assertEqual(sm[0].protocol, 0)
1392 self.assertEqual(sm[0].local_port, 0)
1393 self.assertEqual(sm[0].external_port, 0)
1396 pkts = self.create_stream_in(self.pg0, self.pg1)
1397 self.pg0.add_stream(pkts)
1398 self.pg_enable_capture(self.pg_interfaces)
1400 capture = self.pg1.get_capture(len(pkts))
1401 self.verify_capture_out(capture, nat_ip, True)
1404 pkts = self.create_stream_out(self.pg1, nat_ip)
1405 self.pg1.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg0.get_capture(len(pkts))
1409 self.verify_capture_in(capture, self.pg0)
1411 def test_static_out(self):
1412 """NAT44EI 1:1 NAT initialized from outside network"""
1414 nat_ip = "10.0.0.20"
1415 self.tcp_port_out = 6303
1416 self.udp_port_out = 6304
1417 self.icmp_id_out = 6305
1420 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1421 flags = self.config_flags.NAT44_EI_IF_INSIDE
1422 self.vapi.nat44_ei_interface_add_del_feature(
1423 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1425 self.vapi.nat44_ei_interface_add_del_feature(
1426 sw_if_index=self.pg1.sw_if_index, is_add=1
1428 sm = self.vapi.nat44_ei_static_mapping_dump()
1429 self.assertEqual(len(sm), 1)
1430 self.assertEqual(sm[0].tag, tag)
1433 pkts = self.create_stream_out(self.pg1, nat_ip)
1434 self.pg1.add_stream(pkts)
1435 self.pg_enable_capture(self.pg_interfaces)
1437 capture = self.pg0.get_capture(len(pkts))
1438 self.verify_capture_in(capture, self.pg0)
1441 pkts = self.create_stream_in(self.pg0, self.pg1)
1442 self.pg0.add_stream(pkts)
1443 self.pg_enable_capture(self.pg_interfaces)
1445 capture = self.pg1.get_capture(len(pkts))
1446 self.verify_capture_out(capture, nat_ip, True)
1448 def test_static_with_port_in(self):
1449 """NAT44EI 1:1 NAPT initialized from inside network"""
1451 self.tcp_port_out = 3606
1452 self.udp_port_out = 3607
1453 self.icmp_id_out = 3608
1455 self.nat44_add_address(self.nat_addr)
1456 self.nat44_add_static_mapping(
1457 self.pg0.remote_ip4,
1461 proto=IP_PROTOS.tcp,
1463 self.nat44_add_static_mapping(
1464 self.pg0.remote_ip4,
1468 proto=IP_PROTOS.udp,
1470 self.nat44_add_static_mapping(
1471 self.pg0.remote_ip4,
1475 proto=IP_PROTOS.icmp,
1477 flags = self.config_flags.NAT44_EI_IF_INSIDE
1478 self.vapi.nat44_ei_interface_add_del_feature(
1479 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1481 self.vapi.nat44_ei_interface_add_del_feature(
1482 sw_if_index=self.pg1.sw_if_index, is_add=1
1486 pkts = self.create_stream_in(self.pg0, self.pg1)
1487 self.pg0.add_stream(pkts)
1488 self.pg_enable_capture(self.pg_interfaces)
1490 capture = self.pg1.get_capture(len(pkts))
1491 self.verify_capture_out(capture)
1494 pkts = self.create_stream_out(self.pg1)
1495 self.pg1.add_stream(pkts)
1496 self.pg_enable_capture(self.pg_interfaces)
1498 capture = self.pg0.get_capture(len(pkts))
1499 self.verify_capture_in(capture, self.pg0)
1501 def test_static_with_port_out(self):
1502 """NAT44EI 1:1 NAPT initialized from outside network"""
1504 self.tcp_port_out = 30606
1505 self.udp_port_out = 30607
1506 self.icmp_id_out = 30608
1508 self.nat44_add_address(self.nat_addr)
1509 self.nat44_add_static_mapping(
1510 self.pg0.remote_ip4,
1514 proto=IP_PROTOS.tcp,
1516 self.nat44_add_static_mapping(
1517 self.pg0.remote_ip4,
1521 proto=IP_PROTOS.udp,
1523 self.nat44_add_static_mapping(
1524 self.pg0.remote_ip4,
1528 proto=IP_PROTOS.icmp,
1530 flags = self.config_flags.NAT44_EI_IF_INSIDE
1531 self.vapi.nat44_ei_interface_add_del_feature(
1532 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1534 self.vapi.nat44_ei_interface_add_del_feature(
1535 sw_if_index=self.pg1.sw_if_index, is_add=1
1539 pkts = self.create_stream_out(self.pg1)
1540 self.pg1.add_stream(pkts)
1541 self.pg_enable_capture(self.pg_interfaces)
1543 capture = self.pg0.get_capture(len(pkts))
1544 self.verify_capture_in(capture, self.pg0)
1547 pkts = self.create_stream_in(self.pg0, self.pg1)
1548 self.pg0.add_stream(pkts)
1549 self.pg_enable_capture(self.pg_interfaces)
1551 capture = self.pg1.get_capture(len(pkts))
1552 self.verify_capture_out(capture)
1554 def test_static_vrf_aware(self):
1555 """NAT44EI 1:1 NAT VRF awareness"""
1557 nat_ip1 = "10.0.0.30"
1558 nat_ip2 = "10.0.0.40"
1559 self.tcp_port_out = 6303
1560 self.udp_port_out = 6304
1561 self.icmp_id_out = 6305
1563 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1564 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1565 flags = self.config_flags.NAT44_EI_IF_INSIDE
1566 self.vapi.nat44_ei_interface_add_del_feature(
1567 sw_if_index=self.pg3.sw_if_index, is_add=1
1569 self.vapi.nat44_ei_interface_add_del_feature(
1570 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1572 self.vapi.nat44_ei_interface_add_del_feature(
1573 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1576 # inside interface VRF match NAT44EI static mapping VRF
1577 pkts = self.create_stream_in(self.pg4, self.pg3)
1578 self.pg4.add_stream(pkts)
1579 self.pg_enable_capture(self.pg_interfaces)
1581 capture = self.pg3.get_capture(len(pkts))
1582 self.verify_capture_out(capture, nat_ip1, True)
1584 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1586 pkts = self.create_stream_in(self.pg0, self.pg3)
1587 self.pg0.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 self.pg3.assert_nothing_captured()
1592 def test_dynamic_to_static(self):
1593 """NAT44EI Switch from dynamic translation to 1:1NAT"""
1594 nat_ip = "10.0.0.10"
1595 self.tcp_port_out = 6303
1596 self.udp_port_out = 6304
1597 self.icmp_id_out = 6305
1599 self.nat44_add_address(self.nat_addr)
1600 flags = self.config_flags.NAT44_EI_IF_INSIDE
1601 self.vapi.nat44_ei_interface_add_del_feature(
1602 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1604 self.vapi.nat44_ei_interface_add_del_feature(
1605 sw_if_index=self.pg1.sw_if_index, is_add=1
1609 pkts = self.create_stream_in(self.pg0, self.pg1)
1610 self.pg0.add_stream(pkts)
1611 self.pg_enable_capture(self.pg_interfaces)
1613 capture = self.pg1.get_capture(len(pkts))
1614 self.verify_capture_out(capture)
1617 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1618 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1619 self.assertEqual(len(sessions), 0)
1620 pkts = self.create_stream_in(self.pg0, self.pg1)
1621 self.pg0.add_stream(pkts)
1622 self.pg_enable_capture(self.pg_interfaces)
1624 capture = self.pg1.get_capture(len(pkts))
1625 self.verify_capture_out(capture, nat_ip, True)
1627 def test_identity_nat(self):
1628 """NAT44EI Identity NAT"""
1629 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1630 self.vapi.nat44_ei_add_del_identity_mapping(
1631 ip_address=self.pg0.remote_ip4,
1632 sw_if_index=0xFFFFFFFF,
1636 flags = self.config_flags.NAT44_EI_IF_INSIDE
1637 self.vapi.nat44_ei_interface_add_del_feature(
1638 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1640 self.vapi.nat44_ei_interface_add_del_feature(
1641 sw_if_index=self.pg1.sw_if_index, is_add=1
1645 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1646 / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1647 / TCP(sport=12345, dport=56789)
1649 self.pg1.add_stream(p)
1650 self.pg_enable_capture(self.pg_interfaces)
1652 capture = self.pg0.get_capture(1)
1657 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1658 self.assertEqual(ip.src, self.pg1.remote_ip4)
1659 self.assertEqual(tcp.dport, 56789)
1660 self.assertEqual(tcp.sport, 12345)
1661 self.assert_packet_checksums_valid(p)
1663 self.logger.error(ppp("Unexpected or invalid packet:", p))
1666 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1667 self.assertEqual(len(sessions), 0)
1668 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1669 self.vapi.nat44_ei_add_del_identity_mapping(
1670 ip_address=self.pg0.remote_ip4,
1671 sw_if_index=0xFFFFFFFF,
1676 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1677 self.assertEqual(len(identity_mappings), 2)
1679 def test_multiple_inside_interfaces(self):
1680 """NAT44EI multiple non-overlapping address space inside interfaces"""
1682 self.nat44_add_address(self.nat_addr)
1683 flags = self.config_flags.NAT44_EI_IF_INSIDE
1684 self.vapi.nat44_ei_interface_add_del_feature(
1685 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1687 self.vapi.nat44_ei_interface_add_del_feature(
1688 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1690 self.vapi.nat44_ei_interface_add_del_feature(
1691 sw_if_index=self.pg3.sw_if_index, is_add=1
1694 # between two NAT44EI inside interfaces (no translation)
1695 pkts = self.create_stream_in(self.pg0, self.pg1)
1696 self.pg0.add_stream(pkts)
1697 self.pg_enable_capture(self.pg_interfaces)
1699 capture = self.pg1.get_capture(len(pkts))
1700 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1702 # from inside to interface without translation
1703 pkts = self.create_stream_in(self.pg0, self.pg2)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg2.get_capture(len(pkts))
1708 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1710 # in2out 1st interface
1711 pkts = self.create_stream_in(self.pg0, self.pg3)
1712 self.pg0.add_stream(pkts)
1713 self.pg_enable_capture(self.pg_interfaces)
1715 capture = self.pg3.get_capture(len(pkts))
1716 self.verify_capture_out(capture)
1718 # out2in 1st interface
1719 pkts = self.create_stream_out(self.pg3)
1720 self.pg3.add_stream(pkts)
1721 self.pg_enable_capture(self.pg_interfaces)
1723 capture = self.pg0.get_capture(len(pkts))
1724 self.verify_capture_in(capture, self.pg0)
1726 # in2out 2nd interface
1727 pkts = self.create_stream_in(self.pg1, self.pg3)
1728 self.pg1.add_stream(pkts)
1729 self.pg_enable_capture(self.pg_interfaces)
1731 capture = self.pg3.get_capture(len(pkts))
1732 self.verify_capture_out(capture)
1734 # out2in 2nd interface
1735 pkts = self.create_stream_out(self.pg3)
1736 self.pg3.add_stream(pkts)
1737 self.pg_enable_capture(self.pg_interfaces)
1739 capture = self.pg1.get_capture(len(pkts))
1740 self.verify_capture_in(capture, self.pg1)
1742 def test_inside_overlapping_interfaces(self):
1743 """NAT44EI multiple inside interfaces with overlapping address space"""
1745 static_nat_ip = "10.0.0.10"
1746 self.nat44_add_address(self.nat_addr)
1747 flags = self.config_flags.NAT44_EI_IF_INSIDE
1748 self.vapi.nat44_ei_interface_add_del_feature(
1749 sw_if_index=self.pg3.sw_if_index, is_add=1
1751 self.vapi.nat44_ei_interface_add_del_feature(
1752 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1754 self.vapi.nat44_ei_interface_add_del_feature(
1755 sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1757 self.vapi.nat44_ei_interface_add_del_feature(
1758 sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1760 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1762 # between NAT44EI inside interfaces with same VRF (no translation)
1763 pkts = self.create_stream_in(self.pg4, self.pg5)
1764 self.pg4.add_stream(pkts)
1765 self.pg_enable_capture(self.pg_interfaces)
1767 capture = self.pg5.get_capture(len(pkts))
1768 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1770 # between NAT44EI inside interfaces with different VRF (hairpinning)
1772 Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1773 / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1774 / TCP(sport=1234, dport=5678)
1776 self.pg4.add_stream(p)
1777 self.pg_enable_capture(self.pg_interfaces)
1779 capture = self.pg6.get_capture(1)
1784 self.assertEqual(ip.src, self.nat_addr)
1785 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1786 self.assertNotEqual(tcp.sport, 1234)
1787 self.assertEqual(tcp.dport, 5678)
1789 self.logger.error(ppp("Unexpected or invalid packet:", p))
1792 # in2out 1st interface
1793 pkts = self.create_stream_in(self.pg4, self.pg3)
1794 self.pg4.add_stream(pkts)
1795 self.pg_enable_capture(self.pg_interfaces)
1797 capture = self.pg3.get_capture(len(pkts))
1798 self.verify_capture_out(capture)
1800 # out2in 1st interface
1801 pkts = self.create_stream_out(self.pg3)
1802 self.pg3.add_stream(pkts)
1803 self.pg_enable_capture(self.pg_interfaces)
1805 capture = self.pg4.get_capture(len(pkts))
1806 self.verify_capture_in(capture, self.pg4)
1808 # in2out 2nd interface
1809 pkts = self.create_stream_in(self.pg5, self.pg3)
1810 self.pg5.add_stream(pkts)
1811 self.pg_enable_capture(self.pg_interfaces)
1813 capture = self.pg3.get_capture(len(pkts))
1814 self.verify_capture_out(capture)
1816 # out2in 2nd interface
1817 pkts = self.create_stream_out(self.pg3)
1818 self.pg3.add_stream(pkts)
1819 self.pg_enable_capture(self.pg_interfaces)
1821 capture = self.pg5.get_capture(len(pkts))
1822 self.verify_capture_in(capture, self.pg5)
1825 addresses = self.vapi.nat44_ei_address_dump()
1826 self.assertEqual(len(addresses), 1)
1827 sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1828 self.assertEqual(len(sessions), 3)
1829 for session in sessions:
1830 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1831 self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1832 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1833 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1834 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1835 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1836 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1837 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1838 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1839 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1840 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1841 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1843 # in2out 3rd interface
1844 pkts = self.create_stream_in(self.pg6, self.pg3)
1845 self.pg6.add_stream(pkts)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg3.get_capture(len(pkts))
1849 self.verify_capture_out(capture, static_nat_ip, True)
1851 # out2in 3rd interface
1852 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1853 self.pg3.add_stream(pkts)
1854 self.pg_enable_capture(self.pg_interfaces)
1856 capture = self.pg6.get_capture(len(pkts))
1857 self.verify_capture_in(capture, self.pg6)
1859 # general user and session dump verifications
1860 users = self.vapi.nat44_ei_user_dump()
1861 self.assertGreaterEqual(len(users), 3)
1862 addresses = self.vapi.nat44_ei_address_dump()
1863 self.assertEqual(len(addresses), 1)
1865 sessions = self.vapi.nat44_ei_user_session_dump(
1866 user.ip_address, user.vrf_id
1868 for session in sessions:
1869 self.assertEqual(user.ip_address, session.inside_ip_address)
1870 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1872 session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1876 sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1877 self.assertGreaterEqual(len(sessions), 4)
1878 for session in sessions:
1879 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1880 self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1881 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1884 sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1885 self.assertGreaterEqual(len(sessions), 3)
1886 for session in sessions:
1887 self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1888 self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1889 self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1892 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1895 def test_hairpinning(self):
1896 """NAT44EI hairpinning - 1:1 NAPT"""
1898 host = self.pg0.remote_hosts[0]
1899 server = self.pg0.remote_hosts[1]
1902 server_in_port = 5678
1903 server_out_port = 8765
1905 self.nat44_add_address(self.nat_addr)
1906 flags = self.config_flags.NAT44_EI_IF_INSIDE
1907 self.vapi.nat44_ei_interface_add_del_feature(
1908 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1910 self.vapi.nat44_ei_interface_add_del_feature(
1911 sw_if_index=self.pg1.sw_if_index, is_add=1
1914 # add static mapping for server
1915 self.nat44_add_static_mapping(
1920 proto=IP_PROTOS.tcp,
1923 cnt = self.statistics["/nat44-ei/hairpinning"]
1924 # send packet from host to server
1926 Ether(src=host.mac, dst=self.pg0.local_mac)
1927 / IP(src=host.ip4, dst=self.nat_addr)
1928 / TCP(sport=host_in_port, dport=server_out_port)
1930 self.pg0.add_stream(p)
1931 self.pg_enable_capture(self.pg_interfaces)
1933 capture = self.pg0.get_capture(1)
1938 self.assertEqual(ip.src, self.nat_addr)
1939 self.assertEqual(ip.dst, server.ip4)
1940 self.assertNotEqual(tcp.sport, host_in_port)
1941 self.assertEqual(tcp.dport, server_in_port)
1942 self.assert_packet_checksums_valid(p)
1943 host_out_port = tcp.sport
1945 self.logger.error(ppp("Unexpected or invalid packet:", p))
1948 after = self.statistics["/nat44-ei/hairpinning"]
1949 if_idx = self.pg0.sw_if_index
1950 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1952 # send reply from server to host
1954 Ether(src=server.mac, dst=self.pg0.local_mac)
1955 / IP(src=server.ip4, dst=self.nat_addr)
1956 / TCP(sport=server_in_port, dport=host_out_port)
1958 self.pg0.add_stream(p)
1959 self.pg_enable_capture(self.pg_interfaces)
1961 capture = self.pg0.get_capture(1)
1966 self.assertEqual(ip.src, self.nat_addr)
1967 self.assertEqual(ip.dst, host.ip4)
1968 self.assertEqual(tcp.sport, server_out_port)
1969 self.assertEqual(tcp.dport, host_in_port)
1970 self.assert_packet_checksums_valid(p)
1972 self.logger.error(ppp("Unexpected or invalid packet:", p))
1975 after = self.statistics["/nat44-ei/hairpinning"]
1976 if_idx = self.pg0.sw_if_index
1978 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1979 2 + (1 if self.vpp_worker_count > 0 else 0),
1982 def test_hairpinning2(self):
1983 """NAT44EI hairpinning - 1:1 NAT"""
1985 server1_nat_ip = "10.0.0.10"
1986 server2_nat_ip = "10.0.0.11"
1987 host = self.pg0.remote_hosts[0]
1988 server1 = self.pg0.remote_hosts[1]
1989 server2 = self.pg0.remote_hosts[2]
1990 server_tcp_port = 22
1991 server_udp_port = 20
1993 self.nat44_add_address(self.nat_addr)
1994 flags = self.config_flags.NAT44_EI_IF_INSIDE
1995 self.vapi.nat44_ei_interface_add_del_feature(
1996 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1998 self.vapi.nat44_ei_interface_add_del_feature(
1999 sw_if_index=self.pg1.sw_if_index, is_add=1
2002 # add static mapping for servers
2003 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2004 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2009 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2010 / IP(src=host.ip4, dst=server1_nat_ip)
2011 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2015 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2016 / IP(src=host.ip4, dst=server1_nat_ip)
2017 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2021 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2022 / IP(src=host.ip4, dst=server1_nat_ip)
2023 / ICMP(id=self.icmp_id_in, type="echo-request")
2026 self.pg0.add_stream(pkts)
2027 self.pg_enable_capture(self.pg_interfaces)
2029 capture = self.pg0.get_capture(len(pkts))
2030 for packet in capture:
2032 self.assertEqual(packet[IP].src, self.nat_addr)
2033 self.assertEqual(packet[IP].dst, server1.ip4)
2034 if packet.haslayer(TCP):
2035 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2036 self.assertEqual(packet[TCP].dport, server_tcp_port)
2037 self.tcp_port_out = packet[TCP].sport
2038 self.assert_packet_checksums_valid(packet)
2039 elif packet.haslayer(UDP):
2040 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2041 self.assertEqual(packet[UDP].dport, server_udp_port)
2042 self.udp_port_out = packet[UDP].sport
2044 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2045 self.icmp_id_out = packet[ICMP].id
2047 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2053 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2054 / IP(src=server1.ip4, dst=self.nat_addr)
2055 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2059 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2060 / IP(src=server1.ip4, dst=self.nat_addr)
2061 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2065 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2066 / IP(src=server1.ip4, dst=self.nat_addr)
2067 / ICMP(id=self.icmp_id_out, type="echo-reply")
2070 self.pg0.add_stream(pkts)
2071 self.pg_enable_capture(self.pg_interfaces)
2073 capture = self.pg0.get_capture(len(pkts))
2074 for packet in capture:
2076 self.assertEqual(packet[IP].src, server1_nat_ip)
2077 self.assertEqual(packet[IP].dst, host.ip4)
2078 if packet.haslayer(TCP):
2079 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2080 self.assertEqual(packet[TCP].sport, server_tcp_port)
2081 self.assert_packet_checksums_valid(packet)
2082 elif packet.haslayer(UDP):
2083 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2084 self.assertEqual(packet[UDP].sport, server_udp_port)
2086 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2088 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2091 # server2 to server1
2094 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2095 / IP(src=server2.ip4, dst=server1_nat_ip)
2096 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2100 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2101 / IP(src=server2.ip4, dst=server1_nat_ip)
2102 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2106 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2107 / IP(src=server2.ip4, dst=server1_nat_ip)
2108 / ICMP(id=self.icmp_id_in, type="echo-request")
2111 self.pg0.add_stream(pkts)
2112 self.pg_enable_capture(self.pg_interfaces)
2114 capture = self.pg0.get_capture(len(pkts))
2115 for packet in capture:
2117 self.assertEqual(packet[IP].src, server2_nat_ip)
2118 self.assertEqual(packet[IP].dst, server1.ip4)
2119 if packet.haslayer(TCP):
2120 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2121 self.assertEqual(packet[TCP].dport, server_tcp_port)
2122 self.tcp_port_out = packet[TCP].sport
2123 self.assert_packet_checksums_valid(packet)
2124 elif packet.haslayer(UDP):
2125 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2126 self.assertEqual(packet[UDP].dport, server_udp_port)
2127 self.udp_port_out = packet[UDP].sport
2129 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2130 self.icmp_id_out = packet[ICMP].id
2132 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2135 # server1 to server2
2138 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2139 / IP(src=server1.ip4, dst=server2_nat_ip)
2140 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2144 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2145 / IP(src=server1.ip4, dst=server2_nat_ip)
2146 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2150 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2151 / IP(src=server1.ip4, dst=server2_nat_ip)
2152 / ICMP(id=self.icmp_id_out, type="echo-reply")
2155 self.pg0.add_stream(pkts)
2156 self.pg_enable_capture(self.pg_interfaces)
2158 capture = self.pg0.get_capture(len(pkts))
2159 for packet in capture:
2161 self.assertEqual(packet[IP].src, server1_nat_ip)
2162 self.assertEqual(packet[IP].dst, server2.ip4)
2163 if packet.haslayer(TCP):
2164 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2165 self.assertEqual(packet[TCP].sport, server_tcp_port)
2166 self.assert_packet_checksums_valid(packet)
2167 elif packet.haslayer(UDP):
2168 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2169 self.assertEqual(packet[UDP].sport, server_udp_port)
2171 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2176 def test_hairpinning_avoid_inf_loop(self):
2177 """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2179 host = self.pg0.remote_hosts[0]
2180 server = self.pg0.remote_hosts[1]
2183 server_in_port = 5678
2184 server_out_port = 8765
2186 self.nat44_add_address(self.nat_addr)
2187 flags = self.config_flags.NAT44_EI_IF_INSIDE
2188 self.vapi.nat44_ei_interface_add_del_feature(
2189 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2191 self.vapi.nat44_ei_interface_add_del_feature(
2192 sw_if_index=self.pg1.sw_if_index, is_add=1
2195 # add static mapping for server
2196 self.nat44_add_static_mapping(
2201 proto=IP_PROTOS.tcp,
2204 # add another static mapping that maps pg0.local_ip4 address to itself
2205 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2207 # send packet from host to VPP (the packet should get dropped)
2209 Ether(src=host.mac, dst=self.pg0.local_mac)
2210 / IP(src=host.ip4, dst=self.pg0.local_ip4)
2211 / TCP(sport=host_in_port, dport=server_out_port)
2213 self.pg0.add_stream(p)
2214 self.pg_enable_capture(self.pg_interfaces)
2216 # Here VPP used to crash due to an infinite loop
2218 cnt = self.statistics["/nat44-ei/hairpinning"]
2219 # send packet from host to server
2221 Ether(src=host.mac, dst=self.pg0.local_mac)
2222 / IP(src=host.ip4, dst=self.nat_addr)
2223 / TCP(sport=host_in_port, dport=server_out_port)
2225 self.pg0.add_stream(p)
2226 self.pg_enable_capture(self.pg_interfaces)
2228 capture = self.pg0.get_capture(1)
2233 self.assertEqual(ip.src, self.nat_addr)
2234 self.assertEqual(ip.dst, server.ip4)
2235 self.assertNotEqual(tcp.sport, host_in_port)
2236 self.assertEqual(tcp.dport, server_in_port)
2237 self.assert_packet_checksums_valid(p)
2238 host_out_port = tcp.sport
2240 self.logger.error(ppp("Unexpected or invalid packet:", p))
2243 after = self.statistics["/nat44-ei/hairpinning"]
2244 if_idx = self.pg0.sw_if_index
2245 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2247 # send reply from server to host
2249 Ether(src=server.mac, dst=self.pg0.local_mac)
2250 / IP(src=server.ip4, dst=self.nat_addr)
2251 / TCP(sport=server_in_port, dport=host_out_port)
2253 self.pg0.add_stream(p)
2254 self.pg_enable_capture(self.pg_interfaces)
2256 capture = self.pg0.get_capture(1)
2261 self.assertEqual(ip.src, self.nat_addr)
2262 self.assertEqual(ip.dst, host.ip4)
2263 self.assertEqual(tcp.sport, server_out_port)
2264 self.assertEqual(tcp.dport, host_in_port)
2265 self.assert_packet_checksums_valid(p)
2267 self.logger.error(ppp("Unexpected or invalid packet:", p))
2270 after = self.statistics["/nat44-ei/hairpinning"]
2271 if_idx = self.pg0.sw_if_index
2273 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2274 2 + (1 if self.vpp_worker_count > 0 else 0),
2277 def test_interface_addr(self):
2278 """NAT44EI acquire addresses from interface"""
2279 self.vapi.nat44_ei_add_del_interface_addr(
2280 is_add=1, sw_if_index=self.pg7.sw_if_index
2283 # no address in NAT pool
2284 addresses = self.vapi.nat44_ei_address_dump()
2285 self.assertEqual(0, len(addresses))
2287 # configure interface address and check NAT address pool
2288 self.pg7.config_ip4()
2289 addresses = self.vapi.nat44_ei_address_dump()
2290 self.assertEqual(1, len(addresses))
2291 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2293 # remove interface address and check NAT address pool
2294 self.pg7.unconfig_ip4()
2295 addresses = self.vapi.nat44_ei_address_dump()
2296 self.assertEqual(0, len(addresses))
2298 def test_interface_addr_static_mapping(self):
2299 """NAT44EI Static mapping with addresses from interface"""
2302 self.vapi.nat44_ei_add_del_interface_addr(
2303 is_add=1, sw_if_index=self.pg7.sw_if_index
2305 self.nat44_add_static_mapping(
2306 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2309 # static mappings with external interface
2310 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2311 self.assertEqual(1, len(static_mappings))
2312 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2313 self.assertEqual(static_mappings[0].tag, tag)
2315 # configure interface address and check static mappings
2316 self.pg7.config_ip4()
2317 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2318 self.assertEqual(2, len(static_mappings))
2320 for sm in static_mappings:
2321 if sm.external_sw_if_index == 0xFFFFFFFF:
2322 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2323 self.assertEqual(sm.tag, tag)
2325 self.assertTrue(resolved)
2327 # remove interface address and check static mappings
2328 self.pg7.unconfig_ip4()
2329 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2330 self.assertEqual(1, len(static_mappings))
2331 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2332 self.assertEqual(static_mappings[0].tag, tag)
2334 # configure interface address again and check static mappings
2335 self.pg7.config_ip4()
2336 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2337 self.assertEqual(2, len(static_mappings))
2339 for sm in static_mappings:
2340 if sm.external_sw_if_index == 0xFFFFFFFF:
2341 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2342 self.assertEqual(sm.tag, tag)
2344 self.assertTrue(resolved)
2346 # remove static mapping
2347 self.nat44_add_static_mapping(
2348 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2350 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2351 self.assertEqual(0, len(static_mappings))
2353 def test_interface_addr_identity_nat(self):
2354 """NAT44EI Identity NAT with addresses from interface"""
2357 self.vapi.nat44_ei_add_del_interface_addr(
2358 is_add=1, sw_if_index=self.pg7.sw_if_index
2360 self.vapi.nat44_ei_add_del_identity_mapping(
2362 sw_if_index=self.pg7.sw_if_index,
2364 protocol=IP_PROTOS.tcp,
2368 # identity mappings with external interface
2369 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2370 self.assertEqual(1, len(identity_mappings))
2371 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2373 # configure interface address and check identity mappings
2374 self.pg7.config_ip4()
2375 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2377 self.assertEqual(2, len(identity_mappings))
2378 for sm in identity_mappings:
2379 if sm.sw_if_index == 0xFFFFFFFF:
2381 str(identity_mappings[0].ip_address), self.pg7.local_ip4
2383 self.assertEqual(port, identity_mappings[0].port)
2384 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2386 self.assertTrue(resolved)
2388 # remove interface address and check identity mappings
2389 self.pg7.unconfig_ip4()
2390 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2391 self.assertEqual(1, len(identity_mappings))
2392 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2394 def test_ipfix_nat44_sess(self):
2395 """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2396 self.ipfix_domain_id = 10
2397 self.ipfix_src_port = 20202
2398 collector_port = 30303
2399 bind_layers(UDP, IPFIX, dport=30303)
2400 self.nat44_add_address(self.nat_addr)
2401 flags = self.config_flags.NAT44_EI_IF_INSIDE
2402 self.vapi.nat44_ei_interface_add_del_feature(
2403 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2405 self.vapi.nat44_ei_interface_add_del_feature(
2406 sw_if_index=self.pg1.sw_if_index, is_add=1
2408 self.vapi.set_ipfix_exporter(
2409 collector_address=self.pg3.remote_ip4,
2410 src_address=self.pg3.local_ip4,
2412 template_interval=10,
2413 collector_port=collector_port,
2415 self.vapi.nat44_ei_ipfix_enable_disable(
2416 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2419 pkts = self.create_stream_in(self.pg0, self.pg1)
2420 self.pg0.add_stream(pkts)
2421 self.pg_enable_capture(self.pg_interfaces)
2423 capture = self.pg1.get_capture(len(pkts))
2424 self.verify_capture_out(capture)
2425 self.nat44_add_address(self.nat_addr, is_add=0)
2426 self.vapi.ipfix_flush()
2427 capture = self.pg3.get_capture(7)
2428 ipfix = IPFIXDecoder()
2429 # first load template
2431 self.assertTrue(p.haslayer(IPFIX))
2432 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2433 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2434 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2435 self.assertEqual(p[UDP].dport, collector_port)
2436 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2437 if p.haslayer(Template):
2438 ipfix.add_template(p.getlayer(Template))
2439 # verify events in data set
2441 if p.haslayer(Data):
2442 data = ipfix.decode_data_set(p.getlayer(Set))
2443 self.verify_ipfix_nat44_ses(data)
2445 def test_ipfix_addr_exhausted(self):
2446 """NAT44EI IPFIX logging NAT addresses exhausted"""
2447 flags = self.config_flags.NAT44_EI_IF_INSIDE
2448 self.vapi.nat44_ei_interface_add_del_feature(
2449 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2451 self.vapi.nat44_ei_interface_add_del_feature(
2452 sw_if_index=self.pg1.sw_if_index, is_add=1
2454 self.vapi.set_ipfix_exporter(
2455 collector_address=self.pg3.remote_ip4,
2456 src_address=self.pg3.local_ip4,
2458 template_interval=10,
2460 self.vapi.nat44_ei_ipfix_enable_disable(
2461 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2465 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2466 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2469 self.pg0.add_stream(p)
2470 self.pg_enable_capture(self.pg_interfaces)
2472 self.pg1.assert_nothing_captured()
2473 self.vapi.ipfix_flush()
2474 capture = self.pg3.get_capture(7)
2475 ipfix = IPFIXDecoder()
2476 # first load template
2478 self.assertTrue(p.haslayer(IPFIX))
2479 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2480 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2481 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2482 self.assertEqual(p[UDP].dport, 4739)
2483 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2484 if p.haslayer(Template):
2485 ipfix.add_template(p.getlayer(Template))
2486 # verify events in data set
2489 if p.haslayer(Data):
2490 data = ipfix.decode_data_set(p.getlayer(Set))
2491 event_count += self.verify_ipfix_addr_exhausted(data)
2492 self.assertEqual(event_count, 1)
2494 def test_ipfix_max_sessions(self):
2495 """NAT44EI IPFIX logging maximum session entries exceeded"""
2496 self.nat44_add_address(self.nat_addr)
2497 flags = self.config_flags.NAT44_EI_IF_INSIDE
2498 self.vapi.nat44_ei_interface_add_del_feature(
2499 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2501 self.vapi.nat44_ei_interface_add_del_feature(
2502 sw_if_index=self.pg1.sw_if_index, is_add=1
2505 max_sessions_per_thread = self.max_translations
2506 max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2509 for i in range(0, max_sessions):
2510 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2512 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2513 / IP(src=src, dst=self.pg1.remote_ip4)
2517 self.pg0.add_stream(pkts)
2518 self.pg_enable_capture(self.pg_interfaces)
2521 self.pg1.get_capture(max_sessions)
2522 self.vapi.set_ipfix_exporter(
2523 collector_address=self.pg3.remote_ip4,
2524 src_address=self.pg3.local_ip4,
2526 template_interval=10,
2528 self.vapi.nat44_ei_ipfix_enable_disable(
2529 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2533 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2534 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2537 self.pg0.add_stream(p)
2538 self.pg_enable_capture(self.pg_interfaces)
2540 self.pg1.assert_nothing_captured()
2541 self.vapi.ipfix_flush()
2542 capture = self.pg3.get_capture(7)
2543 ipfix = IPFIXDecoder()
2544 # first load template
2546 self.assertTrue(p.haslayer(IPFIX))
2547 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2548 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2549 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2550 self.assertEqual(p[UDP].dport, 4739)
2551 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2552 if p.haslayer(Template):
2553 ipfix.add_template(p.getlayer(Template))
2554 # verify events in data set
2557 if p.haslayer(Data):
2558 data = ipfix.decode_data_set(p.getlayer(Set))
2559 event_count += self.verify_ipfix_max_sessions(
2560 data, max_sessions_per_thread
2562 self.assertEqual(event_count, 1)
2564 def test_syslog_apmap(self):
2565 """NAT44EI syslog address and port mapping creation and deletion"""
2566 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2567 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2568 self.nat44_add_address(self.nat_addr)
2569 flags = self.config_flags.NAT44_EI_IF_INSIDE
2570 self.vapi.nat44_ei_interface_add_del_feature(
2571 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2573 self.vapi.nat44_ei_interface_add_del_feature(
2574 sw_if_index=self.pg1.sw_if_index, is_add=1
2578 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2579 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2580 / TCP(sport=self.tcp_port_in, dport=20)
2582 self.pg0.add_stream(p)
2583 self.pg_enable_capture(self.pg_interfaces)
2585 capture = self.pg1.get_capture(1)
2586 self.tcp_port_out = capture[0][TCP].sport
2587 capture = self.pg3.get_capture(1)
2588 self.verify_syslog_apmap(capture[0][Raw].load)
2590 self.pg_enable_capture(self.pg_interfaces)
2592 self.nat44_add_address(self.nat_addr, is_add=0)
2593 capture = self.pg3.get_capture(1)
2594 self.verify_syslog_apmap(capture[0][Raw].load, False)
2596 def test_pool_addr_fib(self):
2597 """NAT44EI add pool addresses to FIB"""
2598 static_addr = "10.0.0.10"
2599 self.nat44_add_address(self.nat_addr)
2600 flags = self.config_flags.NAT44_EI_IF_INSIDE
2601 self.vapi.nat44_ei_interface_add_del_feature(
2602 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2604 self.vapi.nat44_ei_interface_add_del_feature(
2605 sw_if_index=self.pg1.sw_if_index, is_add=1
2607 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2610 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2613 psrc=self.pg1.remote_ip4,
2614 hwsrc=self.pg1.remote_mac,
2616 self.pg1.add_stream(p)
2617 self.pg_enable_capture(self.pg_interfaces)
2619 capture = self.pg1.get_capture(1)
2620 self.assertTrue(capture[0].haslayer(ARP))
2621 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2624 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2627 psrc=self.pg1.remote_ip4,
2628 hwsrc=self.pg1.remote_mac,
2630 self.pg1.add_stream(p)
2631 self.pg_enable_capture(self.pg_interfaces)
2633 capture = self.pg1.get_capture(1)
2634 self.assertTrue(capture[0].haslayer(ARP))
2635 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2637 # send ARP to non-NAT44EI interface
2638 p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2641 psrc=self.pg2.remote_ip4,
2642 hwsrc=self.pg2.remote_mac,
2644 self.pg2.add_stream(p)
2645 self.pg_enable_capture(self.pg_interfaces)
2647 self.pg1.assert_nothing_captured()
2649 # remove addresses and verify
2650 self.nat44_add_address(self.nat_addr, is_add=0)
2651 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2653 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2656 psrc=self.pg1.remote_ip4,
2657 hwsrc=self.pg1.remote_mac,
2659 self.pg1.add_stream(p)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 self.pg1.assert_nothing_captured()
2664 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2667 psrc=self.pg1.remote_ip4,
2668 hwsrc=self.pg1.remote_mac,
2670 self.pg1.add_stream(p)
2671 self.pg_enable_capture(self.pg_interfaces)
2673 self.pg1.assert_nothing_captured()
2675 def test_vrf_mode(self):
2676 """NAT44EI tenant VRF aware address pool mode"""
2680 nat_ip1 = "10.0.0.10"
2681 nat_ip2 = "10.0.0.11"
2683 self.pg0.unconfig_ip4()
2684 self.pg1.unconfig_ip4()
2685 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2686 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2687 self.pg0.set_table_ip4(vrf_id1)
2688 self.pg1.set_table_ip4(vrf_id2)
2689 self.pg0.config_ip4()
2690 self.pg1.config_ip4()
2691 self.pg0.resolve_arp()
2692 self.pg1.resolve_arp()
2694 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2695 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2696 flags = self.config_flags.NAT44_EI_IF_INSIDE
2697 self.vapi.nat44_ei_interface_add_del_feature(
2698 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2700 self.vapi.nat44_ei_interface_add_del_feature(
2701 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2703 self.vapi.nat44_ei_interface_add_del_feature(
2704 sw_if_index=self.pg2.sw_if_index, is_add=1
2709 pkts = self.create_stream_in(self.pg0, self.pg2)
2710 self.pg0.add_stream(pkts)
2711 self.pg_enable_capture(self.pg_interfaces)
2713 capture = self.pg2.get_capture(len(pkts))
2714 self.verify_capture_out(capture, nat_ip1)
2717 pkts = self.create_stream_in(self.pg1, self.pg2)
2718 self.pg1.add_stream(pkts)
2719 self.pg_enable_capture(self.pg_interfaces)
2721 capture = self.pg2.get_capture(len(pkts))
2722 self.verify_capture_out(capture, nat_ip2)
2725 self.pg0.unconfig_ip4()
2726 self.pg1.unconfig_ip4()
2727 self.pg0.set_table_ip4(0)
2728 self.pg1.set_table_ip4(0)
2729 self.pg0.config_ip4()
2730 self.pg1.config_ip4()
2731 self.pg0.resolve_arp()
2732 self.pg1.resolve_arp()
2733 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2734 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2736 def test_vrf_feature_independent(self):
2737 """NAT44EI tenant VRF independent address pool mode"""
2739 nat_ip1 = "10.0.0.10"
2740 nat_ip2 = "10.0.0.11"
2742 self.nat44_add_address(nat_ip1)
2743 self.nat44_add_address(nat_ip2, vrf_id=99)
2744 flags = self.config_flags.NAT44_EI_IF_INSIDE
2745 self.vapi.nat44_ei_interface_add_del_feature(
2746 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2748 self.vapi.nat44_ei_interface_add_del_feature(
2749 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2751 self.vapi.nat44_ei_interface_add_del_feature(
2752 sw_if_index=self.pg2.sw_if_index, is_add=1
2756 pkts = self.create_stream_in(self.pg0, self.pg2)
2757 self.pg0.add_stream(pkts)
2758 self.pg_enable_capture(self.pg_interfaces)
2760 capture = self.pg2.get_capture(len(pkts))
2761 self.verify_capture_out(capture, nat_ip1)
2764 pkts = self.create_stream_in(self.pg1, self.pg2)
2765 self.pg1.add_stream(pkts)
2766 self.pg_enable_capture(self.pg_interfaces)
2768 capture = self.pg2.get_capture(len(pkts))
2769 self.verify_capture_out(capture, nat_ip1)
2771 def test_dynamic_ipless_interfaces(self):
2772 """NAT44EI interfaces without configured IP address"""
2773 self.create_routes_and_neigbors()
2774 self.nat44_add_address(self.nat_addr)
2775 flags = self.config_flags.NAT44_EI_IF_INSIDE
2776 self.vapi.nat44_ei_interface_add_del_feature(
2777 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2779 self.vapi.nat44_ei_interface_add_del_feature(
2780 sw_if_index=self.pg8.sw_if_index, is_add=1
2784 pkts = self.create_stream_in(self.pg7, self.pg8)
2785 self.pg7.add_stream(pkts)
2786 self.pg_enable_capture(self.pg_interfaces)
2788 capture = self.pg8.get_capture(len(pkts))
2789 self.verify_capture_out(capture)
2792 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2793 self.pg8.add_stream(pkts)
2794 self.pg_enable_capture(self.pg_interfaces)
2796 capture = self.pg7.get_capture(len(pkts))
2797 self.verify_capture_in(capture, self.pg7)
2799 def test_static_ipless_interfaces(self):
2800 """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2802 self.create_routes_and_neigbors()
2803 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2804 flags = self.config_flags.NAT44_EI_IF_INSIDE
2805 self.vapi.nat44_ei_interface_add_del_feature(
2806 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2808 self.vapi.nat44_ei_interface_add_del_feature(
2809 sw_if_index=self.pg8.sw_if_index, is_add=1
2813 pkts = self.create_stream_out(self.pg8)
2814 self.pg8.add_stream(pkts)
2815 self.pg_enable_capture(self.pg_interfaces)
2817 capture = self.pg7.get_capture(len(pkts))
2818 self.verify_capture_in(capture, self.pg7)
2821 pkts = self.create_stream_in(self.pg7, self.pg8)
2822 self.pg7.add_stream(pkts)
2823 self.pg_enable_capture(self.pg_interfaces)
2825 capture = self.pg8.get_capture(len(pkts))
2826 self.verify_capture_out(capture, self.nat_addr, True)
2828 def test_static_with_port_ipless_interfaces(self):
2829 """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2831 self.tcp_port_out = 30606
2832 self.udp_port_out = 30607
2833 self.icmp_id_out = 30608
2835 self.create_routes_and_neigbors()
2836 self.nat44_add_address(self.nat_addr)
2837 self.nat44_add_static_mapping(
2838 self.pg7.remote_ip4,
2842 proto=IP_PROTOS.tcp,
2844 self.nat44_add_static_mapping(
2845 self.pg7.remote_ip4,
2849 proto=IP_PROTOS.udp,
2851 self.nat44_add_static_mapping(
2852 self.pg7.remote_ip4,
2856 proto=IP_PROTOS.icmp,
2858 flags = self.config_flags.NAT44_EI_IF_INSIDE
2859 self.vapi.nat44_ei_interface_add_del_feature(
2860 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2862 self.vapi.nat44_ei_interface_add_del_feature(
2863 sw_if_index=self.pg8.sw_if_index, is_add=1
2867 pkts = self.create_stream_out(self.pg8)
2868 self.pg8.add_stream(pkts)
2869 self.pg_enable_capture(self.pg_interfaces)
2871 capture = self.pg7.get_capture(len(pkts))
2872 self.verify_capture_in(capture, self.pg7)
2875 pkts = self.create_stream_in(self.pg7, self.pg8)
2876 self.pg7.add_stream(pkts)
2877 self.pg_enable_capture(self.pg_interfaces)
2879 capture = self.pg8.get_capture(len(pkts))
2880 self.verify_capture_out(capture)
2882 def test_static_unknown_proto(self):
2883 """NAT44EI 1:1 translate packet with unknown protocol"""
2884 nat_ip = "10.0.0.10"
2885 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2886 flags = self.config_flags.NAT44_EI_IF_INSIDE
2887 self.vapi.nat44_ei_interface_add_del_feature(
2888 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2890 self.vapi.nat44_ei_interface_add_del_feature(
2891 sw_if_index=self.pg1.sw_if_index, is_add=1
2896 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2897 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2899 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2900 / TCP(sport=1234, dport=1234)
2902 self.pg0.add_stream(p)
2903 self.pg_enable_capture(self.pg_interfaces)
2905 p = self.pg1.get_capture(1)
2908 self.assertEqual(packet[IP].src, nat_ip)
2909 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2910 self.assertEqual(packet.haslayer(GRE), 1)
2911 self.assert_packet_checksums_valid(packet)
2913 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2918 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2919 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2921 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2922 / TCP(sport=1234, dport=1234)
2924 self.pg1.add_stream(p)
2925 self.pg_enable_capture(self.pg_interfaces)
2927 p = self.pg0.get_capture(1)
2930 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2931 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2932 self.assertEqual(packet.haslayer(GRE), 1)
2933 self.assert_packet_checksums_valid(packet)
2935 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2938 def test_hairpinning_static_unknown_proto(self):
2939 """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2941 host = self.pg0.remote_hosts[0]
2942 server = self.pg0.remote_hosts[1]
2944 host_nat_ip = "10.0.0.10"
2945 server_nat_ip = "10.0.0.11"
2947 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2948 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2949 flags = self.config_flags.NAT44_EI_IF_INSIDE
2950 self.vapi.nat44_ei_interface_add_del_feature(
2951 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2953 self.vapi.nat44_ei_interface_add_del_feature(
2954 sw_if_index=self.pg1.sw_if_index, is_add=1
2959 Ether(dst=self.pg0.local_mac, src=host.mac)
2960 / IP(src=host.ip4, dst=server_nat_ip)
2962 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2963 / TCP(sport=1234, dport=1234)
2965 self.pg0.add_stream(p)
2966 self.pg_enable_capture(self.pg_interfaces)
2968 p = self.pg0.get_capture(1)
2971 self.assertEqual(packet[IP].src, host_nat_ip)
2972 self.assertEqual(packet[IP].dst, server.ip4)
2973 self.assertEqual(packet.haslayer(GRE), 1)
2974 self.assert_packet_checksums_valid(packet)
2976 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2981 Ether(dst=self.pg0.local_mac, src=server.mac)
2982 / IP(src=server.ip4, dst=host_nat_ip)
2984 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2985 / TCP(sport=1234, dport=1234)
2987 self.pg0.add_stream(p)
2988 self.pg_enable_capture(self.pg_interfaces)
2990 p = self.pg0.get_capture(1)
2993 self.assertEqual(packet[IP].src, server_nat_ip)
2994 self.assertEqual(packet[IP].dst, host.ip4)
2995 self.assertEqual(packet.haslayer(GRE), 1)
2996 self.assert_packet_checksums_valid(packet)
2998 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3001 def test_output_feature(self):
3002 """NAT44EI output feature (in2out postrouting)"""
3003 self.nat44_add_address(self.nat_addr)
3004 self.vapi.nat44_ei_add_del_output_interface(
3005 sw_if_index=self.pg3.sw_if_index, is_add=1
3009 pkts = self.create_stream_in(self.pg0, self.pg3)
3010 self.pg0.add_stream(pkts)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 capture = self.pg3.get_capture(len(pkts))
3014 self.verify_capture_out(capture)
3017 pkts = self.create_stream_out(self.pg3)
3018 self.pg3.add_stream(pkts)
3019 self.pg_enable_capture(self.pg_interfaces)
3021 capture = self.pg0.get_capture(len(pkts))
3022 self.verify_capture_in(capture, self.pg0)
3024 # from non-NAT interface to NAT inside interface
3025 pkts = self.create_stream_in(self.pg2, self.pg0)
3026 self.pg2.add_stream(pkts)
3027 self.pg_enable_capture(self.pg_interfaces)
3029 capture = self.pg0.get_capture(len(pkts))
3030 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3032 def test_output_feature_vrf_aware(self):
3033 """NAT44EI output feature VRF aware (in2out postrouting)"""
3034 nat_ip_vrf10 = "10.0.0.10"
3035 nat_ip_vrf20 = "10.0.0.20"
3039 self.pg3.remote_ip4,
3041 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3046 self.pg3.remote_ip4,
3048 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3054 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3055 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3056 self.vapi.nat44_ei_add_del_output_interface(
3057 sw_if_index=self.pg3.sw_if_index, is_add=1
3061 pkts = self.create_stream_in(self.pg4, self.pg3)
3062 self.pg4.add_stream(pkts)
3063 self.pg_enable_capture(self.pg_interfaces)
3065 capture = self.pg3.get_capture(len(pkts))
3066 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3069 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3070 self.pg3.add_stream(pkts)
3071 self.pg_enable_capture(self.pg_interfaces)
3073 capture = self.pg4.get_capture(len(pkts))
3074 self.verify_capture_in(capture, self.pg4)
3077 pkts = self.create_stream_in(self.pg6, self.pg3)
3078 self.pg6.add_stream(pkts)
3079 self.pg_enable_capture(self.pg_interfaces)
3081 capture = self.pg3.get_capture(len(pkts))
3082 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3085 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3086 self.pg3.add_stream(pkts)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 capture = self.pg6.get_capture(len(pkts))
3090 self.verify_capture_in(capture, self.pg6)
3092 def test_output_feature_hairpinning(self):
3093 """NAT44EI output feature hairpinning (in2out postrouting)"""
3094 host = self.pg0.remote_hosts[0]
3095 server = self.pg0.remote_hosts[1]
3098 server_in_port = 5678
3099 server_out_port = 8765
3101 self.nat44_add_address(self.nat_addr)
3102 self.vapi.nat44_ei_add_del_output_interface(
3103 sw_if_index=self.pg0.sw_if_index, is_add=1
3105 self.vapi.nat44_ei_add_del_output_interface(
3106 sw_if_index=self.pg1.sw_if_index, is_add=1
3109 # add static mapping for server
3110 self.nat44_add_static_mapping(
3115 proto=IP_PROTOS.tcp,
3118 # send packet from host to server
3120 Ether(src=host.mac, dst=self.pg0.local_mac)
3121 / IP(src=host.ip4, dst=self.nat_addr)
3122 / TCP(sport=host_in_port, dport=server_out_port)
3124 self.pg0.add_stream(p)
3125 self.pg_enable_capture(self.pg_interfaces)
3127 capture = self.pg0.get_capture(1)
3132 self.assertEqual(ip.src, self.nat_addr)
3133 self.assertEqual(ip.dst, server.ip4)
3134 self.assertNotEqual(tcp.sport, host_in_port)
3135 self.assertEqual(tcp.dport, server_in_port)
3136 self.assert_packet_checksums_valid(p)
3137 host_out_port = tcp.sport
3139 self.logger.error(ppp("Unexpected or invalid packet:", p))
3142 # send reply from server to host
3144 Ether(src=server.mac, dst=self.pg0.local_mac)
3145 / IP(src=server.ip4, dst=self.nat_addr)
3146 / TCP(sport=server_in_port, dport=host_out_port)
3148 self.pg0.add_stream(p)
3149 self.pg_enable_capture(self.pg_interfaces)
3151 capture = self.pg0.get_capture(1)
3156 self.assertEqual(ip.src, self.nat_addr)
3157 self.assertEqual(ip.dst, host.ip4)
3158 self.assertEqual(tcp.sport, server_out_port)
3159 self.assertEqual(tcp.dport, host_in_port)
3160 self.assert_packet_checksums_valid(p)
3162 self.logger.error(ppp("Unexpected or invalid packet:", p))
3165 def test_one_armed_nat44(self):
3166 """NAT44EI One armed NAT"""
3167 remote_host = self.pg9.remote_hosts[0]
3168 local_host = self.pg9.remote_hosts[1]
3171 self.nat44_add_address(self.nat_addr)
3172 flags = self.config_flags.NAT44_EI_IF_INSIDE
3173 self.vapi.nat44_ei_interface_add_del_feature(
3174 sw_if_index=self.pg9.sw_if_index, is_add=1
3176 self.vapi.nat44_ei_interface_add_del_feature(
3177 sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3182 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3183 / IP(src=local_host.ip4, dst=remote_host.ip4)
3184 / TCP(sport=12345, dport=80)
3186 self.pg9.add_stream(p)
3187 self.pg_enable_capture(self.pg_interfaces)
3189 capture = self.pg9.get_capture(1)
3194 self.assertEqual(ip.src, self.nat_addr)
3195 self.assertEqual(ip.dst, remote_host.ip4)
3196 self.assertNotEqual(tcp.sport, 12345)
3197 external_port = tcp.sport
3198 self.assertEqual(tcp.dport, 80)
3199 self.assert_packet_checksums_valid(p)
3201 self.logger.error(ppp("Unexpected or invalid packet:", p))
3206 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3207 / IP(src=remote_host.ip4, dst=self.nat_addr)
3208 / TCP(sport=80, dport=external_port)
3210 self.pg9.add_stream(p)
3211 self.pg_enable_capture(self.pg_interfaces)
3213 capture = self.pg9.get_capture(1)
3218 self.assertEqual(ip.src, remote_host.ip4)
3219 self.assertEqual(ip.dst, local_host.ip4)
3220 self.assertEqual(tcp.sport, 80)
3221 self.assertEqual(tcp.dport, 12345)
3222 self.assert_packet_checksums_valid(p)
3224 self.logger.error(ppp("Unexpected or invalid packet:", p))
3227 if self.vpp_worker_count > 1:
3228 node = "nat44-ei-handoff-classify"
3230 node = "nat44-ei-classify"
3232 err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3233 self.assertEqual(err, 1)
3234 err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3235 self.assertEqual(err, 1)
3237 def test_del_session(self):
3238 """NAT44EI delete session"""
3239 self.nat44_add_address(self.nat_addr)
3240 flags = self.config_flags.NAT44_EI_IF_INSIDE
3241 self.vapi.nat44_ei_interface_add_del_feature(
3242 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3244 self.vapi.nat44_ei_interface_add_del_feature(
3245 sw_if_index=self.pg1.sw_if_index, is_add=1
3248 pkts = self.create_stream_in(self.pg0, self.pg1)
3249 self.pg0.add_stream(pkts)
3250 self.pg_enable_capture(self.pg_interfaces)
3252 self.pg1.get_capture(len(pkts))
3254 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3255 nsessions = len(sessions)
3257 self.vapi.nat44_ei_del_session(
3258 address=sessions[0].inside_ip_address,
3259 port=sessions[0].inside_port,
3260 protocol=sessions[0].protocol,
3261 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3264 self.vapi.nat44_ei_del_session(
3265 address=sessions[1].outside_ip_address,
3266 port=sessions[1].outside_port,
3267 protocol=sessions[1].protocol,
3270 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3271 self.assertEqual(nsessions - len(sessions), 2)
3273 self.vapi.nat44_ei_del_session(
3274 address=sessions[0].inside_ip_address,
3275 port=sessions[0].inside_port,
3276 protocol=sessions[0].protocol,
3277 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3280 self.verify_no_nat44_user()
3282 def test_frag_in_order(self):
3283 """NAT44EI translate fragments arriving in order"""
3285 self.nat44_add_address(self.nat_addr)
3286 flags = self.config_flags.NAT44_EI_IF_INSIDE
3287 self.vapi.nat44_ei_interface_add_del_feature(
3288 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3290 self.vapi.nat44_ei_interface_add_del_feature(
3291 sw_if_index=self.pg1.sw_if_index, is_add=1
3294 self.frag_in_order(proto=IP_PROTOS.tcp)
3295 self.frag_in_order(proto=IP_PROTOS.udp)
3296 self.frag_in_order(proto=IP_PROTOS.icmp)
3298 def test_frag_forwarding(self):
3299 """NAT44EI forwarding fragment test"""
3300 self.vapi.nat44_ei_add_del_interface_addr(
3301 is_add=1, sw_if_index=self.pg1.sw_if_index
3303 flags = self.config_flags.NAT44_EI_IF_INSIDE
3304 self.vapi.nat44_ei_interface_add_del_feature(
3305 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3307 self.vapi.nat44_ei_interface_add_del_feature(
3308 sw_if_index=self.pg1.sw_if_index, is_add=1
3310 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3312 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3313 pkts = self.create_stream_frag(
3314 self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3316 self.pg1.add_stream(pkts)
3317 self.pg_enable_capture(self.pg_interfaces)
3319 frags = self.pg0.get_capture(len(pkts))
3320 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3321 self.assertEqual(p[UDP].sport, 4789)
3322 self.assertEqual(p[UDP].dport, 4789)
3323 self.assertEqual(data, p[Raw].load)
3325 def test_reass_hairpinning(self):
3326 """NAT44EI fragments hairpinning"""
3328 server_addr = self.pg0.remote_hosts[1].ip4
3329 host_in_port = random.randint(1025, 65535)
3330 server_in_port = random.randint(1025, 65535)
3331 server_out_port = random.randint(1025, 65535)
3333 self.nat44_add_address(self.nat_addr)
3334 flags = self.config_flags.NAT44_EI_IF_INSIDE
3335 self.vapi.nat44_ei_interface_add_del_feature(
3336 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3338 self.vapi.nat44_ei_interface_add_del_feature(
3339 sw_if_index=self.pg1.sw_if_index, is_add=1
3341 # add static mapping for server
3342 self.nat44_add_static_mapping(
3347 proto=IP_PROTOS.tcp,
3349 self.nat44_add_static_mapping(
3354 proto=IP_PROTOS.udp,
3356 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3358 self.reass_hairpinning(
3363 proto=IP_PROTOS.tcp,
3365 self.reass_hairpinning(
3370 proto=IP_PROTOS.udp,
3372 self.reass_hairpinning(
3377 proto=IP_PROTOS.icmp,
3380 def test_frag_out_of_order(self):
3381 """NAT44EI translate fragments arriving out of order"""
3383 self.nat44_add_address(self.nat_addr)
3384 flags = self.config_flags.NAT44_EI_IF_INSIDE
3385 self.vapi.nat44_ei_interface_add_del_feature(
3386 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3388 self.vapi.nat44_ei_interface_add_del_feature(
3389 sw_if_index=self.pg1.sw_if_index, is_add=1
3392 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3393 self.frag_out_of_order(proto=IP_PROTOS.udp)
3394 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3396 def test_port_restricted(self):
3397 """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3398 self.nat44_add_address(self.nat_addr)
3399 flags = self.config_flags.NAT44_EI_IF_INSIDE
3400 self.vapi.nat44_ei_interface_add_del_feature(
3401 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3403 self.vapi.nat44_ei_interface_add_del_feature(
3404 sw_if_index=self.pg1.sw_if_index, is_add=1
3406 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3407 alg=1, psid_offset=6, psid_length=6, psid=10
3411 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3412 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3413 / TCP(sport=4567, dport=22)
3415 self.pg0.add_stream(p)
3416 self.pg_enable_capture(self.pg_interfaces)
3418 capture = self.pg1.get_capture(1)
3423 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3424 self.assertEqual(ip.src, self.nat_addr)
3425 self.assertEqual(tcp.dport, 22)
3426 self.assertNotEqual(tcp.sport, 4567)
3427 self.assertEqual((tcp.sport >> 6) & 63, 10)
3428 self.assert_packet_checksums_valid(p)
3430 self.logger.error(ppp("Unexpected or invalid packet:", p))
3433 def test_port_range(self):
3434 """NAT44EI External address port range"""
3435 self.nat44_add_address(self.nat_addr)
3436 flags = self.config_flags.NAT44_EI_IF_INSIDE
3437 self.vapi.nat44_ei_interface_add_del_feature(
3438 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3440 self.vapi.nat44_ei_interface_add_del_feature(
3441 sw_if_index=self.pg1.sw_if_index, is_add=1
3443 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3444 alg=2, start_port=1025, end_port=1027
3448 for port in range(0, 5):
3450 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3451 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3452 / TCP(sport=1125 + port)
3455 self.pg0.add_stream(pkts)
3456 self.pg_enable_capture(self.pg_interfaces)
3458 capture = self.pg1.get_capture(3)
3461 self.assertGreaterEqual(tcp.sport, 1025)
3462 self.assertLessEqual(tcp.sport, 1027)
3464 def test_multiple_outside_vrf(self):
3465 """NAT44EI Multiple outside VRF"""
3469 self.pg1.unconfig_ip4()
3470 self.pg2.unconfig_ip4()
3471 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3472 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3473 self.pg1.set_table_ip4(vrf_id1)
3474 self.pg2.set_table_ip4(vrf_id2)
3475 self.pg1.config_ip4()
3476 self.pg2.config_ip4()
3477 self.pg1.resolve_arp()
3478 self.pg2.resolve_arp()
3480 self.nat44_add_address(self.nat_addr)
3481 flags = self.config_flags.NAT44_EI_IF_INSIDE
3482 self.vapi.nat44_ei_interface_add_del_feature(
3483 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3485 self.vapi.nat44_ei_interface_add_del_feature(
3486 sw_if_index=self.pg1.sw_if_index, is_add=1
3488 self.vapi.nat44_ei_interface_add_del_feature(
3489 sw_if_index=self.pg2.sw_if_index, is_add=1
3494 pkts = self.create_stream_in(self.pg0, self.pg1)
3495 self.pg0.add_stream(pkts)
3496 self.pg_enable_capture(self.pg_interfaces)
3498 capture = self.pg1.get_capture(len(pkts))
3499 self.verify_capture_out(capture, self.nat_addr)
3501 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3502 self.pg1.add_stream(pkts)
3503 self.pg_enable_capture(self.pg_interfaces)
3505 capture = self.pg0.get_capture(len(pkts))
3506 self.verify_capture_in(capture, self.pg0)
3508 self.tcp_port_in = 60303
3509 self.udp_port_in = 60304
3510 self.icmp_id_in = 60305
3513 pkts = self.create_stream_in(self.pg0, self.pg2)
3514 self.pg0.add_stream(pkts)
3515 self.pg_enable_capture(self.pg_interfaces)
3517 capture = self.pg2.get_capture(len(pkts))
3518 self.verify_capture_out(capture, self.nat_addr)
3520 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3521 self.pg2.add_stream(pkts)
3522 self.pg_enable_capture(self.pg_interfaces)
3524 capture = self.pg0.get_capture(len(pkts))
3525 self.verify_capture_in(capture, self.pg0)
3528 self.nat44_add_address(self.nat_addr, is_add=0)
3529 self.pg1.unconfig_ip4()
3530 self.pg2.unconfig_ip4()
3531 self.pg1.set_table_ip4(0)
3532 self.pg2.set_table_ip4(0)
3533 self.pg1.config_ip4()
3534 self.pg2.config_ip4()
3535 self.pg1.resolve_arp()
3536 self.pg2.resolve_arp()
3538 def test_mss_clamping(self):
3539 """NAT44EI TCP MSS clamping"""
3540 self.nat44_add_address(self.nat_addr)
3541 flags = self.config_flags.NAT44_EI_IF_INSIDE
3542 self.vapi.nat44_ei_interface_add_del_feature(
3543 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3545 self.vapi.nat44_ei_interface_add_del_feature(
3546 sw_if_index=self.pg1.sw_if_index, is_add=1
3550 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3551 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3553 sport=self.tcp_port_in,
3554 dport=self.tcp_external_port,
3556 options=[("MSS", 1400)],
3560 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3561 self.pg0.add_stream(p)
3562 self.pg_enable_capture(self.pg_interfaces)
3564 capture = self.pg1.get_capture(1)
3565 # Negotiated MSS value greater than configured - changed
3566 self.verify_mss_value(capture[0], 1000)
3568 self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3569 self.pg0.add_stream(p)
3570 self.pg_enable_capture(self.pg_interfaces)
3572 capture = self.pg1.get_capture(1)
3573 # MSS clamping disabled - negotiated MSS unchanged
3574 self.verify_mss_value(capture[0], 1400)
3576 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3577 self.pg0.add_stream(p)
3578 self.pg_enable_capture(self.pg_interfaces)
3580 capture = self.pg1.get_capture(1)
3581 # Negotiated MSS value smaller than configured - unchanged
3582 self.verify_mss_value(capture[0], 1400)
3584 def test_ha_send(self):
3585 """NAT44EI Send HA session synchronization events (active)"""
3586 flags = self.config_flags.NAT44_EI_IF_INSIDE
3587 self.vapi.nat44_ei_interface_add_del_feature(
3588 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3590 self.vapi.nat44_ei_interface_add_del_feature(
3591 sw_if_index=self.pg1.sw_if_index, is_add=1
3593 self.nat44_add_address(self.nat_addr)
3595 self.vapi.nat44_ei_ha_set_listener(
3596 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3598 self.vapi.nat44_ei_ha_set_failover(
3599 ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3601 bind_layers(UDP, HANATStateSync, sport=12345)
3604 pkts = self.create_stream_in(self.pg0, self.pg1)
3605 self.pg0.add_stream(pkts)
3606 self.pg_enable_capture(self.pg_interfaces)
3608 capture = self.pg1.get_capture(len(pkts))
3609 self.verify_capture_out(capture)
3610 # active send HA events
3611 self.vapi.nat44_ei_ha_flush()
3612 stats = self.statistics["/nat44-ei/ha/add-event-send"]
3613 self.assertEqual(stats[:, 0].sum(), 3)
3614 capture = self.pg3.get_capture(1)
3616 self.assert_packet_checksums_valid(p)
3620 hanat = p[HANATStateSync]
3622 self.logger.error(ppp("Invalid packet:", p))
3625 self.assertEqual(ip.src, self.pg3.local_ip4)
3626 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3627 self.assertEqual(udp.sport, 12345)
3628 self.assertEqual(udp.dport, 12346)
3629 self.assertEqual(hanat.version, 1)
3630 # self.assertEqual(hanat.thread_index, 0)
3631 self.assertEqual(hanat.count, 3)
3632 seq = hanat.sequence_number
3633 for event in hanat.events:
3634 self.assertEqual(event.event_type, 1)
3635 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3636 self.assertEqual(event.out_addr, self.nat_addr)
3637 self.assertEqual(event.fib_index, 0)
3639 # ACK received events
3641 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3642 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3643 / UDP(sport=12346, dport=12345)
3645 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3648 self.pg3.add_stream(ack)
3650 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3651 self.assertEqual(stats[:, 0].sum(), 1)
3653 # delete one session
3654 self.pg_enable_capture(self.pg_interfaces)
3655 self.vapi.nat44_ei_del_session(
3656 address=self.pg0.remote_ip4,
3657 port=self.tcp_port_in,
3658 protocol=IP_PROTOS.tcp,
3659 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3661 self.vapi.nat44_ei_ha_flush()
3662 stats = self.statistics["/nat44-ei/ha/del-event-send"]
3663 self.assertEqual(stats[:, 0].sum(), 1)
3664 capture = self.pg3.get_capture(1)
3667 hanat = p[HANATStateSync]
3669 self.logger.error(ppp("Invalid packet:", p))
3672 self.assertGreater(hanat.sequence_number, seq)
3674 # do not send ACK, active retry send HA event again
3675 self.pg_enable_capture(self.pg_interfaces)
3676 self.virtual_sleep(12)
3677 stats = self.statistics["/nat44-ei/ha/retry-count"]
3678 self.assertEqual(stats[:, 0].sum(), 3)
3679 stats = self.statistics["/nat44-ei/ha/missed-count"]
3680 self.assertEqual(stats[:, 0].sum(), 1)
3681 capture = self.pg3.get_capture(3)
3682 for packet in capture:
3683 self.assertEqual(packet, p)
3685 # session counters refresh
3686 pkts = self.create_stream_out(self.pg1)
3687 self.pg1.add_stream(pkts)
3688 self.pg_enable_capture(self.pg_interfaces)
3690 self.pg0.get_capture(2)
3691 self.vapi.nat44_ei_ha_flush()
3692 stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3693 self.assertEqual(stats[:, 0].sum(), 2)
3694 capture = self.pg3.get_capture(1)
3696 self.assert_packet_checksums_valid(p)
3700 hanat = p[HANATStateSync]
3702 self.logger.error(ppp("Invalid packet:", p))
3705 self.assertEqual(ip.src, self.pg3.local_ip4)
3706 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3707 self.assertEqual(udp.sport, 12345)
3708 self.assertEqual(udp.dport, 12346)
3709 self.assertEqual(hanat.version, 1)
3710 self.assertEqual(hanat.count, 2)
3711 seq = hanat.sequence_number
3712 for event in hanat.events:
3713 self.assertEqual(event.event_type, 3)
3714 self.assertEqual(event.out_addr, self.nat_addr)
3715 self.assertEqual(event.fib_index, 0)
3716 self.assertEqual(event.total_pkts, 2)
3717 self.assertGreater(event.total_bytes, 0)
3719 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3721 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3722 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3723 / UDP(sport=12346, dport=12345)
3725 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3728 self.pg3.add_stream(ack)
3730 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3731 self.assertEqual(stats[:, 0].sum(), 2)
3733 def test_ha_recv(self):
3734 """NAT44EI Receive HA session synchronization events (passive)"""
3735 self.nat44_add_address(self.nat_addr)
3736 flags = self.config_flags.NAT44_EI_IF_INSIDE
3737 self.vapi.nat44_ei_interface_add_del_feature(
3738 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3740 self.vapi.nat44_ei_interface_add_del_feature(
3741 sw_if_index=self.pg1.sw_if_index, is_add=1
3743 self.vapi.nat44_ei_ha_set_listener(
3744 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3746 bind_layers(UDP, HANATStateSync, sport=12345)
3748 # this is a bit tricky - HA dictates thread index due to how it's
3749 # designed, but once we use HA to create a session, we also want
3750 # to pass a packet through said session. so the session must end
3751 # up on the correct thread from both directions - in2out (based on
3752 # IP address) and out2in (based on outside port)
3754 # first choose a thread index which is correct for IP
3755 thread_index = get_nat44_ei_in2out_worker_index(
3756 self.pg0.remote_ip4, self.vpp_worker_count
3759 # now pick a port which is correct for given thread
3760 port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3761 self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3762 self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3763 if self.vpp_worker_count > 0:
3764 self.tcp_port_out += port_per_thread * (thread_index - 1)
3765 self.udp_port_out += port_per_thread * (thread_index - 1)
3767 # send HA session add events to failover/passive
3769 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3770 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3771 / UDP(sport=12346, dport=12345)
3778 in_addr=self.pg0.remote_ip4,
3779 out_addr=self.nat_addr,
3780 in_port=self.tcp_port_in,
3781 out_port=self.tcp_port_out,
3782 eh_addr=self.pg1.remote_ip4,
3783 ehn_addr=self.pg1.remote_ip4,
3784 eh_port=self.tcp_external_port,
3785 ehn_port=self.tcp_external_port,
3791 in_addr=self.pg0.remote_ip4,
3792 out_addr=self.nat_addr,
3793 in_port=self.udp_port_in,
3794 out_port=self.udp_port_out,
3795 eh_addr=self.pg1.remote_ip4,
3796 ehn_addr=self.pg1.remote_ip4,
3797 eh_port=self.udp_external_port,
3798 ehn_port=self.udp_external_port,
3802 thread_index=thread_index,
3806 self.pg3.add_stream(p)
3807 self.pg_enable_capture(self.pg_interfaces)
3810 capture = self.pg3.get_capture(1)
3813 hanat = p[HANATStateSync]
3815 self.logger.error(ppp("Invalid packet:", p))
3818 self.assertEqual(hanat.sequence_number, 1)
3819 self.assertEqual(hanat.flags, "ACK")
3820 self.assertEqual(hanat.version, 1)
3821 self.assertEqual(hanat.thread_index, thread_index)
3822 stats = self.statistics["/nat44-ei/ha/ack-send"]
3823 self.assertEqual(stats[:, 0].sum(), 1)
3824 stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3825 self.assertEqual(stats[:, 0].sum(), 2)
3826 users = self.statistics["/nat44-ei/total-users"]
3827 self.assertEqual(users[:, 0].sum(), 1)
3828 sessions = self.statistics["/nat44-ei/total-sessions"]
3829 self.assertEqual(sessions[:, 0].sum(), 2)
3830 users = self.vapi.nat44_ei_user_dump()
3831 self.assertEqual(len(users), 1)
3832 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3833 # there should be 2 sessions created by HA
3834 sessions = self.vapi.nat44_ei_user_session_dump(
3835 users[0].ip_address, users[0].vrf_id
3837 self.assertEqual(len(sessions), 2)
3838 for session in sessions:
3839 self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3840 self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3841 self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3842 self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3843 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3845 # send HA session delete event to failover/passive
3847 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3848 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3849 / UDP(sport=12346, dport=12345)
3856 in_addr=self.pg0.remote_ip4,
3857 out_addr=self.nat_addr,
3858 in_port=self.udp_port_in,
3859 out_port=self.udp_port_out,
3860 eh_addr=self.pg1.remote_ip4,
3861 ehn_addr=self.pg1.remote_ip4,
3862 eh_port=self.udp_external_port,
3863 ehn_port=self.udp_external_port,
3867 thread_index=thread_index,
3871 self.pg3.add_stream(p)
3872 self.pg_enable_capture(self.pg_interfaces)
3875 capture = self.pg3.get_capture(1)
3878 hanat = p[HANATStateSync]
3880 self.logger.error(ppp("Invalid packet:", p))
3883 self.assertEqual(hanat.sequence_number, 2)
3884 self.assertEqual(hanat.flags, "ACK")
3885 self.assertEqual(hanat.version, 1)
3886 users = self.vapi.nat44_ei_user_dump()
3887 self.assertEqual(len(users), 1)
3888 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3889 # now we should have only 1 session, 1 deleted by HA
3890 sessions = self.vapi.nat44_ei_user_session_dump(
3891 users[0].ip_address, users[0].vrf_id
3893 self.assertEqual(len(sessions), 1)
3894 stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3895 self.assertEqual(stats[:, 0].sum(), 1)
3897 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3898 self.assertEqual(stats, 2)
3900 # send HA session refresh event to failover/passive
3902 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3903 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3904 / UDP(sport=12346, dport=12345)
3909 event_type="refresh",
3911 in_addr=self.pg0.remote_ip4,
3912 out_addr=self.nat_addr,
3913 in_port=self.tcp_port_in,
3914 out_port=self.tcp_port_out,
3915 eh_addr=self.pg1.remote_ip4,
3916 ehn_addr=self.pg1.remote_ip4,
3917 eh_port=self.tcp_external_port,
3918 ehn_port=self.tcp_external_port,
3924 thread_index=thread_index,
3927 self.pg3.add_stream(p)
3928 self.pg_enable_capture(self.pg_interfaces)
3931 capture = self.pg3.get_capture(1)
3934 hanat = p[HANATStateSync]
3936 self.logger.error(ppp("Invalid packet:", p))
3939 self.assertEqual(hanat.sequence_number, 3)
3940 self.assertEqual(hanat.flags, "ACK")
3941 self.assertEqual(hanat.version, 1)
3942 users = self.vapi.nat44_ei_user_dump()
3943 self.assertEqual(len(users), 1)
3944 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3945 sessions = self.vapi.nat44_ei_user_session_dump(
3946 users[0].ip_address, users[0].vrf_id
3948 self.assertEqual(len(sessions), 1)
3949 session = sessions[0]
3950 self.assertEqual(session.total_bytes, 1024)
3951 self.assertEqual(session.total_pkts, 2)
3952 stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3953 self.assertEqual(stats[:, 0].sum(), 1)
3955 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3956 self.assertEqual(stats, 3)
3958 # send packet to test session created by HA
3960 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3961 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3962 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3964 self.pg1.add_stream(p)
3965 self.pg_enable_capture(self.pg_interfaces)
3967 capture = self.pg0.get_capture(1)
3973 self.logger.error(ppp("Invalid packet:", p))
3976 self.assertEqual(ip.src, self.pg1.remote_ip4)
3977 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3978 self.assertEqual(tcp.sport, self.tcp_external_port)
3979 self.assertEqual(tcp.dport, self.tcp_port_in)
3981 def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3982 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3983 self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3984 # keep plugin configuration persistent
3985 self.plugin_enable()
3986 return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3988 def test_set_frame_queue_nelts(self):
3989 """NAT44EI API test - worker handoff frame queue elements"""
3990 self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3992 def show_commands_at_teardown(self):
3993 self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3994 self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3995 self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3996 self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3997 self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3998 self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3999 self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
4000 self.logger.info(self.vapi.cli("show nat44 ei ha"))
4001 self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
4003 def test_outside_address_distribution(self):
4004 """Outside address distribution based on source address"""
4009 for i in range(1, x):
4011 nat_addresses.append(a)
4013 flags = self.config_flags.NAT44_EI_IF_INSIDE
4014 self.vapi.nat44_ei_interface_add_del_feature(
4015 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4017 self.vapi.nat44_ei_interface_add_del_feature(
4018 sw_if_index=self.pg1.sw_if_index, is_add=1
4021 self.vapi.nat44_ei_add_del_address_range(
4022 first_ip_address=nat_addresses[0],
4023 last_ip_address=nat_addresses[-1],
4028 self.pg0.generate_remote_hosts(x)
4032 info = self.create_packet_info(self.pg0, self.pg1)
4033 payload = self.info_to_payload(info)
4035 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4036 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4037 / UDP(sport=7000 + i, dport=8000 + i)
4043 self.pg0.add_stream(pkts)
4044 self.pg_enable_capture(self.pg_interfaces)
4046 recvd = self.pg1.get_capture(len(pkts))
4047 for p_recvd in recvd:
4048 payload_info = self.payload_to_info(p_recvd[Raw])
4049 packet_index = payload_info.index
4050 info = self._packet_infos[packet_index]
4051 self.assertTrue(info is not None)
4052 self.assertEqual(packet_index, info.index)
4054 packed = socket.inet_aton(p_sent[IP].src)
4055 numeric = struct.unpack("!L", packed)[0]
4056 numeric = socket.htonl(numeric)
4057 a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4061 "Invalid packet (src IP %s translated to %s, but expected %s)"
4062 % (p_sent[IP].src, p_recvd[IP].src, a),
4065 def test_default_user_sessions(self):
4066 """NAT44EI default per-user session limit is used and reported"""
4067 nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4068 # a nonzero default should be reported for user_sessions
4069 self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4071 def test_delete_interface(self):
4072 """NAT44EI delete nat interface"""
4074 self.nat44_add_address(self.nat_addr)
4076 interfaces = self.create_loopback_interfaces(4)
4078 self.vapi.nat44_ei_interface_add_del_feature(
4079 sw_if_index=interfaces[0].sw_if_index, is_add=1
4081 flags = self.config_flags.NAT44_EI_IF_INSIDE
4082 self.vapi.nat44_ei_interface_add_del_feature(
4083 sw_if_index=interfaces[1].sw_if_index, flags=flags, is_add=1
4085 flags |= self.config_flags.NAT44_EI_IF_OUTSIDE
4086 self.vapi.nat44_ei_interface_add_del_feature(
4087 sw_if_index=interfaces[2].sw_if_index, flags=flags, is_add=1
4089 self.vapi.nat44_ei_add_del_output_interface(
4090 sw_if_index=interfaces[3].sw_if_index, is_add=1
4093 nat_sw_if_indices = [
4095 for i in self.vapi.nat44_ei_interface_dump()
4096 + list(self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get))
4098 self.assertEqual(len(nat_sw_if_indices), len(interfaces))
4101 for i in interfaces:
4102 # delete nat-enabled interface
4103 self.assertIn(i.sw_if_index, nat_sw_if_indices)
4104 i.remove_vpp_config()
4106 # create interface with the same index
4107 lo = VppLoInterface(self)
4108 loopbacks.append(lo)
4109 self.assertEqual(lo.sw_if_index, i.sw_if_index)
4111 # check interface is not nat-enabled
4112 nat_sw_if_indices = [
4114 for i in self.vapi.nat44_ei_interface_dump()
4116 self.vapi.vpp.details_iter(self.vapi.nat44_ei_output_interface_get)
4119 self.assertNotIn(lo.sw_if_index, nat_sw_if_indices)
4122 i.remove_vpp_config()
4125 class TestNAT44Out2InDPO(MethodHolder):
4126 """NAT44EI Test Cases using out2in DPO"""
4129 def setUpClass(cls):
4130 super(TestNAT44Out2InDPO, cls).setUpClass()
4131 cls.vapi.cli("set log class nat44-ei level debug")
4133 cls.tcp_port_in = 6303
4134 cls.tcp_port_out = 6303
4135 cls.udp_port_in = 6304
4136 cls.udp_port_out = 6304
4137 cls.icmp_id_in = 6305
4138 cls.icmp_id_out = 6305
4139 cls.nat_addr = "10.0.0.3"
4140 cls.dst_ip4 = "192.168.70.1"
4142 cls.create_pg_interfaces(range(2))
4145 cls.pg0.config_ip4()
4146 cls.pg0.resolve_arp()
4149 cls.pg1.config_ip6()
4150 cls.pg1.resolve_ndp()
4156 [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4162 super(TestNAT44Out2InDPO, self).setUp()
4163 flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4164 self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4167 super(TestNAT44Out2InDPO, self).tearDown()
4168 if not self.vpp_dead:
4169 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4170 self.vapi.cli("clear logging")
4172 def configure_xlat(self):
4173 self.dst_ip6_pfx = "1:2:3::"
4174 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4175 self.dst_ip6_pfx_len = 96
4176 self.src_ip6_pfx = "4:5:6::"
4177 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4178 self.src_ip6_pfx_len = 96
4179 self.vapi.map_add_domain(
4181 self.dst_ip6_pfx_len,
4183 self.src_ip6_pfx_len,
4188 @unittest.skip("Temporary disabled")
4189 def test_464xlat_ce(self):
4190 """Test 464XLAT CE with NAT44EI"""
4192 self.configure_xlat()
4194 flags = self.config_flags.NAT44_EI_IF_INSIDE
4195 self.vapi.nat44_ei_interface_add_del_feature(
4196 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4198 self.vapi.nat44_ei_add_del_address_range(
4199 first_ip_address=self.nat_addr_n,
4200 last_ip_address=self.nat_addr_n,
4205 out_src_ip6 = self.compose_ip6(
4206 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4208 out_dst_ip6 = self.compose_ip6(
4209 self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4213 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4214 self.pg0.add_stream(pkts)
4215 self.pg_enable_capture(self.pg_interfaces)
4217 capture = self.pg1.get_capture(len(pkts))
4218 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4220 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4221 self.pg1.add_stream(pkts)
4222 self.pg_enable_capture(self.pg_interfaces)
4224 capture = self.pg0.get_capture(len(pkts))
4225 self.verify_capture_in(capture, self.pg0)
4227 self.vapi.nat44_ei_interface_add_del_feature(
4228 sw_if_index=self.pg0.sw_if_index, flags=flags
4230 self.vapi.nat44_ei_add_del_address_range(
4231 first_ip_address=self.nat_addr_n,
4232 last_ip_address=self.nat_addr_n,
4236 @unittest.skip("Temporary disabled")
4237 def test_464xlat_ce_no_nat(self):
4238 """Test 464XLAT CE without NAT44EI"""
4240 self.configure_xlat()
4242 out_src_ip6 = self.compose_ip6(
4243 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4245 out_dst_ip6 = self.compose_ip6(
4246 self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4249 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4250 self.pg0.add_stream(pkts)
4251 self.pg_enable_capture(self.pg_interfaces)
4253 capture = self.pg1.get_capture(len(pkts))
4254 self.verify_capture_out_ip6(
4255 capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4258 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4259 self.pg1.add_stream(pkts)
4260 self.pg_enable_capture(self.pg_interfaces)
4262 capture = self.pg0.get_capture(len(pkts))
4263 self.verify_capture_in(capture, self.pg0)
4266 class TestNAT44EIMW(MethodHolder):
4267 """NAT44EI Test Cases (multiple workers)"""
4269 vpp_worker_count = 2
4270 max_translations = 10240
4274 def setUpClass(cls):
4275 super(TestNAT44EIMW, cls).setUpClass()
4276 cls.vapi.cli("set log class nat level debug")
4278 cls.tcp_port_in = 6303
4279 cls.tcp_port_out = 6303
4280 cls.udp_port_in = 6304
4281 cls.udp_port_out = 6304
4282 cls.icmp_id_in = 6305
4283 cls.icmp_id_out = 6305
4284 cls.nat_addr = "10.0.0.3"
4285 cls.ipfix_src_port = 4739
4286 cls.ipfix_domain_id = 1
4287 cls.tcp_external_port = 80
4288 cls.udp_external_port = 69
4290 cls.create_pg_interfaces(range(10))
4291 cls.interfaces = list(cls.pg_interfaces[0:4])
4293 for i in cls.interfaces:
4298 cls.pg0.generate_remote_hosts(3)
4299 cls.pg0.configure_ipv4_neighbors()
4301 cls.pg1.generate_remote_hosts(1)
4302 cls.pg1.configure_ipv4_neighbors()
4304 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4305 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4306 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4308 cls.pg4._local_ip4 = "172.16.255.1"
4309 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4310 cls.pg4.set_table_ip4(10)
4311 cls.pg5._local_ip4 = "172.17.255.3"
4312 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4313 cls.pg5.set_table_ip4(10)
4314 cls.pg6._local_ip4 = "172.16.255.1"
4315 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4316 cls.pg6.set_table_ip4(20)
4317 for i in cls.overlapping_interfaces:
4325 cls.pg9.generate_remote_hosts(2)
4326 cls.pg9.config_ip4()
4327 cls.vapi.sw_interface_add_del_address(
4328 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4332 cls.pg9.resolve_arp()
4333 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4334 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4335 cls.pg9.resolve_arp()
4338 super(TestNAT44EIMW, self).setUp()
4339 self.vapi.nat44_ei_plugin_enable_disable(
4340 sessions=self.max_translations, users=self.max_users, enable=1
4344 super(TestNAT44EIMW, self).tearDown()
4345 if not self.vpp_dead:
4346 self.vapi.nat44_ei_ipfix_enable_disable(
4347 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4349 self.ipfix_src_port = 4739
4350 self.ipfix_domain_id = 1
4352 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4353 self.vapi.cli("clear logging")
4355 def test_hairpinning(self):
4356 """NAT44EI hairpinning - 1:1 NAPT"""
4358 host = self.pg0.remote_hosts[0]
4359 server = self.pg0.remote_hosts[1]
4362 server_in_port = 5678
4363 server_out_port = 8765
4367 self.nat44_add_address(self.nat_addr)
4368 flags = self.config_flags.NAT44_EI_IF_INSIDE
4369 self.vapi.nat44_ei_interface_add_del_feature(
4370 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4372 self.vapi.nat44_ei_interface_add_del_feature(
4373 sw_if_index=self.pg1.sw_if_index, is_add=1
4376 # add static mapping for server
4377 self.nat44_add_static_mapping(
4382 proto=IP_PROTOS.tcp,
4385 cnt = self.statistics["/nat44-ei/hairpinning"]
4386 # send packet from host to server
4388 Ether(src=host.mac, dst=self.pg0.local_mac)
4389 / IP(src=host.ip4, dst=self.nat_addr)
4390 / TCP(sport=host_in_port, dport=server_out_port)
4392 self.pg0.add_stream(p)
4393 self.pg_enable_capture(self.pg_interfaces)
4395 capture = self.pg0.get_capture(1)
4400 self.assertEqual(ip.src, self.nat_addr)
4401 self.assertEqual(ip.dst, server.ip4)
4402 self.assertNotEqual(tcp.sport, host_in_port)
4403 self.assertEqual(tcp.dport, server_in_port)
4404 self.assert_packet_checksums_valid(p)
4405 host_out_port = tcp.sport
4407 self.logger.error(ppp("Unexpected or invalid packet:", p))
4410 after = self.statistics["/nat44-ei/hairpinning"]
4412 if_idx = self.pg0.sw_if_index
4413 self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4415 # send reply from server to host
4417 Ether(src=server.mac, dst=self.pg0.local_mac)
4418 / IP(src=server.ip4, dst=self.nat_addr)
4419 / TCP(sport=server_in_port, dport=host_out_port)
4421 self.pg0.add_stream(p)
4422 self.pg_enable_capture(self.pg_interfaces)
4424 capture = self.pg0.get_capture(1)
4429 self.assertEqual(ip.src, self.nat_addr)
4430 self.assertEqual(ip.dst, host.ip4)
4431 self.assertEqual(tcp.sport, server_out_port)
4432 self.assertEqual(tcp.dport, host_in_port)
4433 self.assert_packet_checksums_valid(p)
4435 self.logger.error(ppp("Unexpected or invalid packet:", p))
4438 after = self.statistics["/nat44-ei/hairpinning"]
4439 if_idx = self.pg0.sw_if_index
4440 self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4441 self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4443 def test_hairpinning2(self):
4444 """NAT44EI hairpinning - 1:1 NAT"""
4446 server1_nat_ip = "10.0.0.10"
4447 server2_nat_ip = "10.0.0.11"
4448 host = self.pg0.remote_hosts[0]
4449 server1 = self.pg0.remote_hosts[1]
4450 server2 = self.pg0.remote_hosts[2]
4451 server_tcp_port = 22
4452 server_udp_port = 20
4454 self.nat44_add_address(self.nat_addr)
4455 flags = self.config_flags.NAT44_EI_IF_INSIDE
4456 self.vapi.nat44_ei_interface_add_del_feature(
4457 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4459 self.vapi.nat44_ei_interface_add_del_feature(
4460 sw_if_index=self.pg1.sw_if_index, is_add=1
4463 # add static mapping for servers
4464 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4465 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4470 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4471 / IP(src=host.ip4, dst=server1_nat_ip)
4472 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4476 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4477 / IP(src=host.ip4, dst=server1_nat_ip)
4478 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4482 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4483 / IP(src=host.ip4, dst=server1_nat_ip)
4484 / ICMP(id=self.icmp_id_in, type="echo-request")
4487 self.pg0.add_stream(pkts)
4488 self.pg_enable_capture(self.pg_interfaces)
4490 capture = self.pg0.get_capture(len(pkts))
4491 for packet in capture:
4493 self.assertEqual(packet[IP].src, self.nat_addr)
4494 self.assertEqual(packet[IP].dst, server1.ip4)
4495 if packet.haslayer(TCP):
4496 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4497 self.assertEqual(packet[TCP].dport, server_tcp_port)
4498 self.tcp_port_out = packet[TCP].sport
4499 self.assert_packet_checksums_valid(packet)
4500 elif packet.haslayer(UDP):
4501 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4502 self.assertEqual(packet[UDP].dport, server_udp_port)
4503 self.udp_port_out = packet[UDP].sport
4505 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4506 self.icmp_id_out = packet[ICMP].id
4508 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4514 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4515 / IP(src=server1.ip4, dst=self.nat_addr)
4516 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4520 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4521 / IP(src=server1.ip4, dst=self.nat_addr)
4522 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4526 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4527 / IP(src=server1.ip4, dst=self.nat_addr)
4528 / ICMP(id=self.icmp_id_out, type="echo-reply")
4531 self.pg0.add_stream(pkts)
4532 self.pg_enable_capture(self.pg_interfaces)
4534 capture = self.pg0.get_capture(len(pkts))
4535 for packet in capture:
4537 self.assertEqual(packet[IP].src, server1_nat_ip)
4538 self.assertEqual(packet[IP].dst, host.ip4)
4539 if packet.haslayer(TCP):
4540 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4541 self.assertEqual(packet[TCP].sport, server_tcp_port)
4542 self.assert_packet_checksums_valid(packet)
4543 elif packet.haslayer(UDP):
4544 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4545 self.assertEqual(packet[UDP].sport, server_udp_port)
4547 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4549 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4552 # server2 to server1
4555 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4556 / IP(src=server2.ip4, dst=server1_nat_ip)
4557 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4561 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4562 / IP(src=server2.ip4, dst=server1_nat_ip)
4563 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4567 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4568 / IP(src=server2.ip4, dst=server1_nat_ip)
4569 / ICMP(id=self.icmp_id_in, type="echo-request")
4572 self.pg0.add_stream(pkts)
4573 self.pg_enable_capture(self.pg_interfaces)
4575 capture = self.pg0.get_capture(len(pkts))
4576 for packet in capture:
4578 self.assertEqual(packet[IP].src, server2_nat_ip)
4579 self.assertEqual(packet[IP].dst, server1.ip4)
4580 if packet.haslayer(TCP):
4581 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4582 self.assertEqual(packet[TCP].dport, server_tcp_port)
4583 self.tcp_port_out = packet[TCP].sport
4584 self.assert_packet_checksums_valid(packet)
4585 elif packet.haslayer(UDP):
4586 self.assertEqual(packet[UDP].sport, self.udp_port_in)
4587 self.assertEqual(packet[UDP].dport, server_udp_port)
4588 self.udp_port_out = packet[UDP].sport
4590 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4591 self.icmp_id_out = packet[ICMP].id
4593 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4596 # server1 to server2
4599 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4600 / IP(src=server1.ip4, dst=server2_nat_ip)
4601 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4605 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4606 / IP(src=server1.ip4, dst=server2_nat_ip)
4607 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4611 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4612 / IP(src=server1.ip4, dst=server2_nat_ip)
4613 / ICMP(id=self.icmp_id_out, type="echo-reply")
4616 self.pg0.add_stream(pkts)
4617 self.pg_enable_capture(self.pg_interfaces)
4619 capture = self.pg0.get_capture(len(pkts))
4620 for packet in capture:
4622 self.assertEqual(packet[IP].src, server1_nat_ip)
4623 self.assertEqual(packet[IP].dst, server2.ip4)
4624 if packet.haslayer(TCP):
4625 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4626 self.assertEqual(packet[TCP].sport, server_tcp_port)
4627 self.assert_packet_checksums_valid(packet)
4628 elif packet.haslayer(UDP):
4629 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4630 self.assertEqual(packet[UDP].sport, server_udp_port)
4632 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4634 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4638 if __name__ == "__main__":
4639 unittest.main(testRunner=VppTestRunner)