11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import (
26 from scapy.data import IP_PROTOS
27 from scapy.layers.inet import IP, TCP, UDP, ICMP
28 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
29 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
30 from scapy.layers.l2 import Ether, ARP, GRE
31 from scapy.packet import Raw
32 from syslog_rfc5424_parser import SyslogMessage, ParseError
33 from syslog_rfc5424_parser.constants import SyslogSeverity
35 from vpp_ip_route import VppIpRoute, VppRoutePath
36 from vpp_neighbor import VppNeighbor
37 from vpp_papi import VppEnum
40 # NAT HA protocol event data
44 ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}),
45 ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
46 ShortField("flags", 0),
47 IPField("in_addr", None),
48 IPField("out_addr", None),
49 ShortField("in_port", None),
50 ShortField("out_port", None),
51 IPField("eh_addr", None),
52 IPField("ehn_addr", None),
53 ShortField("eh_port", None),
54 ShortField("ehn_port", None),
55 IntField("fib_index", None),
56 IntField("total_pkts", 0),
57 LongField("total_bytes", 0),
60 def extract_padding(self, s):
64 # NAT HA protocol header
65 class HANATStateSync(Packet):
66 name = "HA NAT state sync"
68 XByteField("version", 1),
69 FlagsField("flags", 0, 8, ["ACK"]),
70 FieldLenField("count", None, count_of="events"),
71 IntField("sequence_number", 1),
72 IntField("thread_index", 0),
73 PacketListField("events", [], Event, count_from=lambda pkt: pkt.count),
77 class MethodHolder(VppTestCase):
78 """NAT create capture and verify method holder"""
81 def config_flags(self):
82 return VppEnum.vl_api_nat44_ei_config_flags_t
85 def SYSLOG_SEVERITY(self):
86 return VppEnum.vl_api_syslog_severity_t
88 def nat44_add_static_mapping(
91 external_ip="0.0.0.0",
96 external_sw_if_index=0xFFFFFFFF,
102 Add/delete NAT44EI static mapping
104 :param local_ip: Local IP address
105 :param external_ip: External IP address
106 :param local_port: Local port number (Optional)
107 :param external_port: External port number (Optional)
108 :param vrf_id: VRF ID (Default 0)
109 :param is_add: 1 if add, 0 if delete (Default add)
110 :param external_sw_if_index: External interface instead of IP address
111 :param proto: IP protocol (Mandatory if port specified)
112 :param tag: Opaque string tag
113 :param flags: NAT configuration flags
116 if not (local_port and external_port):
117 flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
119 self.vapi.nat44_ei_add_del_static_mapping(
121 local_ip_address=local_ip,
122 external_ip_address=external_ip,
123 external_sw_if_index=external_sw_if_index,
124 local_port=local_port,
125 external_port=external_port,
132 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
134 Add/delete NAT44EI address
136 :param ip: IP address
137 :param is_add: 1 if add, 0 if delete (Default add)
139 self.vapi.nat44_ei_add_del_address_range(
140 first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add
143 def create_routes_and_neigbors(self):
148 [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)],
154 [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)],
161 self.pg7.sw_if_index,
168 self.pg8.sw_if_index,
176 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
178 Create packet stream for inside network
180 :param in_if: Inside interface
181 :param out_if: Outside interface
182 :param dst_ip: Destination address
183 :param ttl: TTL of generated packets
186 dst_ip = out_if.remote_ip4
191 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
192 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
193 / TCP(sport=self.tcp_port_in, dport=20)
199 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
200 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
201 / UDP(sport=self.udp_port_in, dport=20)
207 Ether(dst=in_if.local_mac, src=in_if.remote_mac)
208 / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl)
209 / ICMP(id=self.icmp_id_in, type="echo-request")
215 def compose_ip6(self, ip4, pref, plen):
217 Compose IPv4-embedded IPv6 addresses
219 :param ip4: IPv4 address
220 :param pref: IPv6 prefix
221 :param plen: IPv6 prefix length
222 :returns: IPv4-embedded IPv6 addresses
224 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
225 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
240 pref_n[10] = ip4_n[3]
244 pref_n[10] = ip4_n[2]
245 pref_n[11] = ip4_n[3]
248 pref_n[10] = ip4_n[1]
249 pref_n[11] = ip4_n[2]
250 pref_n[12] = ip4_n[3]
252 pref_n[12] = ip4_n[0]
253 pref_n[13] = ip4_n[1]
254 pref_n[14] = ip4_n[2]
255 pref_n[15] = ip4_n[3]
256 packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n])
257 return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
259 def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False):
261 Create packet stream for outside network
263 :param out_if: Outside interface
264 :param dst_ip: Destination IP address (Default use global NAT address)
265 :param ttl: TTL of generated packets
266 :param use_inside_ports: Use inside NAT ports as destination ports
267 instead of outside ports
270 dst_ip = self.nat_addr
271 if not use_inside_ports:
272 tcp_port = self.tcp_port_out
273 udp_port = self.udp_port_out
274 icmp_id = self.icmp_id_out
276 tcp_port = self.tcp_port_in
277 udp_port = self.udp_port_in
278 icmp_id = self.icmp_id_in
282 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
283 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
284 / TCP(dport=tcp_port, sport=20)
290 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
291 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
292 / UDP(dport=udp_port, sport=20)
298 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
299 / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl)
300 / ICMP(id=icmp_id, type="echo-reply")
306 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
308 Create packet stream for outside network
310 :param out_if: Outside interface
311 :param dst_ip: Destination IP address (Default use global NAT address)
312 :param hl: HL of generated packets
317 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
318 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
319 / TCP(dport=self.tcp_port_out, sport=20)
325 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
326 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
327 / UDP(dport=self.udp_port_out, sport=20)
333 Ether(dst=out_if.local_mac, src=out_if.remote_mac)
334 / IPv6(src=src_ip, dst=dst_ip, hlim=hl)
335 / ICMPv6EchoReply(id=self.icmp_id_out)
341 def verify_capture_out(
351 Verify captured packets on outside network
353 :param capture: Captured packets
354 :param nat_ip: Translated IP address (Default use global NAT address)
355 :param same_port: Source port number is not translated (Default False)
356 :param dst_ip: Destination IP address (Default do not verify)
357 :param is_ip6: If L3 protocol is IPv6 (Default False)
361 ICMP46 = ICMPv6EchoRequest
366 nat_ip = self.nat_addr
367 for packet in capture:
370 self.assert_packet_checksums_valid(packet)
371 self.assertEqual(packet[IP46].src, nat_ip)
372 if dst_ip is not None:
373 self.assertEqual(packet[IP46].dst, dst_ip)
374 if packet.haslayer(TCP):
377 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
379 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
380 self.tcp_port_out = packet[TCP].sport
381 self.assert_packet_checksums_valid(packet)
382 elif packet.haslayer(UDP):
385 self.assertEqual(packet[UDP].sport, self.udp_port_in)
387 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
388 self.udp_port_out = packet[UDP].sport
392 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
394 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
395 self.icmp_id_out = packet[ICMP46].id
396 self.assert_packet_checksums_valid(packet)
399 ppp("Unexpected or invalid packet (outside network):", packet)
403 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None):
405 Verify captured packets on outside network
407 :param capture: Captured packets
408 :param nat_ip: Translated IP address
409 :param same_port: Source port number is not translated (Default False)
410 :param dst_ip: Destination IP address (Default do not verify)
412 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True)
414 def verify_capture_in(self, capture, in_if):
416 Verify captured packets on inside network
418 :param capture: Captured packets
419 :param in_if: Inside interface
421 for packet in capture:
423 self.assert_packet_checksums_valid(packet)
424 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
425 if packet.haslayer(TCP):
426 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
427 elif packet.haslayer(UDP):
428 self.assertEqual(packet[UDP].dport, self.udp_port_in)
430 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
433 ppp("Unexpected or invalid packet (inside network):", packet)
437 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
439 Verify captured packet that don't have to be translated
441 :param capture: Captured packets
442 :param ingress_if: Ingress interface
443 :param egress_if: Egress interface
445 for packet in capture:
447 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
448 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
449 if packet.haslayer(TCP):
450 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
451 elif packet.haslayer(UDP):
452 self.assertEqual(packet[UDP].sport, self.udp_port_in)
454 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
457 ppp("Unexpected or invalid packet (inside network):", packet)
461 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11):
463 Verify captured packets with ICMP errors on outside network
465 :param capture: Captured packets
466 :param src_ip: Translated IP address or IP address of VPP
467 (Default use global NAT address)
468 :param icmp_type: Type of error ICMP packet
469 we are expecting (Default 11)
472 src_ip = self.nat_addr
473 for packet in capture:
475 self.assertEqual(packet[IP].src, src_ip)
476 self.assertEqual(packet.haslayer(ICMP), 1)
478 self.assertEqual(icmp.type, icmp_type)
479 self.assertTrue(icmp.haslayer(IPerror))
480 inner_ip = icmp[IPerror]
481 if inner_ip.haslayer(TCPerror):
482 self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out)
483 elif inner_ip.haslayer(UDPerror):
484 self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out)
486 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
489 ppp("Unexpected or invalid packet (outside network):", packet)
493 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
495 Verify captured packets with ICMP errors on inside network
497 :param capture: Captured packets
498 :param in_if: Inside interface
499 :param icmp_type: Type of error ICMP packet
500 we are expecting (Default 11)
502 for packet in capture:
504 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
505 self.assertEqual(packet.haslayer(ICMP), 1)
507 self.assertEqual(icmp.type, icmp_type)
508 self.assertTrue(icmp.haslayer(IPerror))
509 inner_ip = icmp[IPerror]
510 if inner_ip.haslayer(TCPerror):
511 self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in)
512 elif inner_ip.haslayer(UDPerror):
513 self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in)
515 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
518 ppp("Unexpected or invalid packet (inside network):", packet)
522 def create_stream_frag(
523 self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False
526 Create fragmented packet stream
528 :param src_if: Source interface
529 :param dst: Destination IPv4 address
530 :param sport: Source port
531 :param dport: Destination port
532 :param data: Payload data
533 :param proto: protocol (TCP, UDP, ICMP)
534 :param echo_reply: use echo_reply if protocol is ICMP
537 if proto == IP_PROTOS.tcp:
539 IP(src=src_if.remote_ip4, dst=dst)
540 / TCP(sport=sport, dport=dport)
543 p = p.__class__(scapy.compat.raw(p))
544 chksum = p[TCP].chksum
545 proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
546 elif proto == IP_PROTOS.udp:
547 proto_header = UDP(sport=sport, dport=dport)
548 elif proto == IP_PROTOS.icmp:
550 proto_header = ICMP(id=sport, type="echo-request")
552 proto_header = ICMP(id=sport, type="echo-reply")
554 raise Exception("Unsupported protocol")
555 id = random.randint(0, 65535)
557 if proto == IP_PROTOS.tcp:
560 raw = Raw(data[0:16])
562 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
563 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id)
568 if proto == IP_PROTOS.tcp:
569 raw = Raw(data[4:20])
571 raw = Raw(data[16:32])
573 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
574 / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto)
578 if proto == IP_PROTOS.tcp:
583 Ether(src=src_if.remote_mac, dst=src_if.local_mac)
584 / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id)
590 def reass_frags_and_verify(self, frags, src, dst):
592 Reassemble and verify fragmented packet
594 :param frags: Captured fragments
595 :param src: Source IPv4 address to verify
596 :param dst: Destination IPv4 address to verify
598 :returns: Reassembled IPv4 packet
602 self.assertEqual(p[IP].src, src)
603 self.assertEqual(p[IP].dst, dst)
604 self.assert_ip_checksum_valid(p)
605 buffer.seek(p[IP].frag * 8)
606 buffer.write(bytes(p[IP].payload))
607 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto)
608 if ip.proto == IP_PROTOS.tcp:
609 p = ip / TCP(buffer.getvalue())
610 self.logger.debug(ppp("Reassembled:", p))
611 self.assert_tcp_checksum_valid(p)
612 elif ip.proto == IP_PROTOS.udp:
613 p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])
614 elif ip.proto == IP_PROTOS.icmp:
615 p = ip / ICMP(buffer.getvalue())
618 def verify_ipfix_nat44_ses(self, data):
620 Verify IPFIX NAT44EI session create/delete event
622 :param data: Decoded IPFIX data records
624 nat44_ses_create_num = 0
625 nat44_ses_delete_num = 0
626 self.assertEqual(6, len(data))
629 self.assertIn(scapy.compat.orb(record[230]), [4, 5])
630 if scapy.compat.orb(record[230]) == 4:
631 nat44_ses_create_num += 1
633 nat44_ses_delete_num += 1
635 self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8])))
636 # postNATSourceIPv4Address
638 socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]
641 self.assertEqual(struct.pack("!I", 0), record[234])
642 # protocolIdentifier/sourceTransportPort
643 # /postNAPTSourceTransportPort
644 if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
645 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
646 self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227])
647 elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
648 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
649 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
650 elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
651 self.assertEqual(struct.pack("!H", self.udp_port_in), record[7])
652 self.assertEqual(struct.pack("!H", self.udp_port_out), record[227])
654 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
655 self.assertEqual(3, nat44_ses_create_num)
656 self.assertEqual(3, nat44_ses_delete_num)
658 def verify_ipfix_addr_exhausted(self, data):
659 self.assertEqual(1, len(data))
662 self.assertEqual(scapy.compat.orb(record[230]), 3)
664 self.assertEqual(struct.pack("!I", 0), record[283])
666 def verify_ipfix_max_sessions(self, data, limit):
667 self.assertEqual(1, len(data))
670 self.assertEqual(scapy.compat.orb(record[230]), 13)
671 # natQuotaExceededEvent
672 self.assertEqual(struct.pack("!I", 1), record[466])
674 self.assertEqual(struct.pack("!I", limit), record[471])
676 def verify_no_nat44_user(self):
677 """Verify that there is no NAT44EI user"""
678 users = self.vapi.nat44_ei_user_dump()
679 self.assertEqual(len(users), 0)
680 users = self.statistics["/nat44-ei/total-users"]
681 self.assertEqual(users[0][0], 0)
682 sessions = self.statistics["/nat44-ei/total-sessions"]
683 self.assertEqual(sessions[0][0], 0)
685 def verify_syslog_apmap(self, data, is_add=True):
686 message = data.decode("utf-8")
688 message = SyslogMessage.parse(message)
689 except ParseError as e:
693 self.assertEqual(message.severity, SyslogSeverity.info)
694 self.assertEqual(message.appname, "NAT")
695 self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL")
696 sd_params = message.sd.get("napmap")
697 self.assertTrue(sd_params is not None)
698 self.assertEqual(sd_params.get("IATYP"), "IPv4")
699 self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4)
700 self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in)
701 self.assertEqual(sd_params.get("XATYP"), "IPv4")
702 self.assertEqual(sd_params.get("XSADDR"), self.nat_addr)
703 self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out)
704 self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp)
705 self.assertTrue(sd_params.get("SSUBIX") is not None)
706 self.assertEqual(sd_params.get("SVLAN"), "0")
708 def verify_mss_value(self, pkt, mss):
709 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
710 raise TypeError("Not a TCP/IP packet")
712 for option in pkt[TCP].options:
713 if option[0] == "MSS":
714 self.assertEqual(option[1], mss)
715 self.assert_tcp_checksum_valid(pkt)
718 def proto2layer(proto):
719 if proto == IP_PROTOS.tcp:
721 elif proto == IP_PROTOS.udp:
723 elif proto == IP_PROTOS.icmp:
726 raise Exception("Unsupported protocol")
729 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
731 layer = self.proto2layer(proto)
733 if proto == IP_PROTOS.tcp:
734 data = b"A" * 4 + b"B" * 16 + b"C" * 3
736 data = b"A" * 16 + b"B" * 16 + b"C" * 3
737 self.port_in = random.randint(1025, 65535)
740 pkts = self.create_stream_frag(
741 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
743 self.pg0.add_stream(pkts)
744 self.pg_enable_capture(self.pg_interfaces)
746 frags = self.pg1.get_capture(len(pkts))
747 if not dont_translate:
748 p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4)
750 p = self.reass_frags_and_verify(
751 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
753 if proto != IP_PROTOS.icmp:
754 if not dont_translate:
755 self.assertEqual(p[layer].dport, 20)
757 self.assertNotEqual(p[layer].sport, self.port_in)
759 self.assertEqual(p[layer].sport, self.port_in)
762 if not dont_translate:
763 self.assertNotEqual(p[layer].id, self.port_in)
765 self.assertEqual(p[layer].id, self.port_in)
766 self.assertEqual(data, p[Raw].load)
769 if not dont_translate:
770 dst_addr = self.nat_addr
772 dst_addr = self.pg0.remote_ip4
773 if proto != IP_PROTOS.icmp:
775 dport = p[layer].sport
779 pkts = self.create_stream_frag(
780 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
782 self.pg1.add_stream(pkts)
783 self.pg_enable_capture(self.pg_interfaces)
785 frags = self.pg0.get_capture(len(pkts))
786 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
787 if proto != IP_PROTOS.icmp:
788 self.assertEqual(p[layer].sport, 20)
789 self.assertEqual(p[layer].dport, self.port_in)
791 self.assertEqual(p[layer].id, self.port_in)
792 self.assertEqual(data, p[Raw].load)
794 def reass_hairpinning(
804 layer = self.proto2layer(proto)
806 if proto == IP_PROTOS.tcp:
807 data = b"A" * 4 + b"B" * 16 + b"C" * 3
809 data = b"A" * 16 + b"B" * 16 + b"C" * 3
811 # send packet from host to server
812 pkts = self.create_stream_frag(
813 self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto
815 self.pg0.add_stream(pkts)
816 self.pg_enable_capture(self.pg_interfaces)
818 frags = self.pg0.get_capture(len(pkts))
819 p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr)
820 if proto != IP_PROTOS.icmp:
822 self.assertNotEqual(p[layer].sport, host_in_port)
823 self.assertEqual(p[layer].dport, server_in_port)
826 self.assertNotEqual(p[layer].id, host_in_port)
827 self.assertEqual(data, p[Raw].load)
829 def frag_out_of_order(
830 self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False
832 layer = self.proto2layer(proto)
834 if proto == IP_PROTOS.tcp:
835 data = b"A" * 4 + b"B" * 16 + b"C" * 3
837 data = b"A" * 16 + b"B" * 16 + b"C" * 3
838 self.port_in = random.randint(1025, 65535)
842 pkts = self.create_stream_frag(
843 self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto
846 self.pg0.add_stream(pkts)
847 self.pg_enable_capture(self.pg_interfaces)
849 frags = self.pg1.get_capture(len(pkts))
850 if not dont_translate:
851 p = self.reass_frags_and_verify(
852 frags, self.nat_addr, self.pg1.remote_ip4
855 p = self.reass_frags_and_verify(
856 frags, self.pg0.remote_ip4, self.pg1.remote_ip4
858 if proto != IP_PROTOS.icmp:
859 if not dont_translate:
860 self.assertEqual(p[layer].dport, 20)
862 self.assertNotEqual(p[layer].sport, self.port_in)
864 self.assertEqual(p[layer].sport, self.port_in)
867 if not dont_translate:
868 self.assertNotEqual(p[layer].id, self.port_in)
870 self.assertEqual(p[layer].id, self.port_in)
871 self.assertEqual(data, p[Raw].load)
874 if not dont_translate:
875 dst_addr = self.nat_addr
877 dst_addr = self.pg0.remote_ip4
878 if proto != IP_PROTOS.icmp:
880 dport = p[layer].sport
884 pkts = self.create_stream_frag(
885 self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True
888 self.pg1.add_stream(pkts)
889 self.pg_enable_capture(self.pg_interfaces)
891 frags = self.pg0.get_capture(len(pkts))
892 p = self.reass_frags_and_verify(
893 frags, self.pg1.remote_ip4, self.pg0.remote_ip4
895 if proto != IP_PROTOS.icmp:
896 self.assertEqual(p[layer].sport, 20)
897 self.assertEqual(p[layer].dport, self.port_in)
899 self.assertEqual(p[layer].id, self.port_in)
900 self.assertEqual(data, p[Raw].load)
903 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
904 if 0 == vpp_worker_count:
906 numeric = socket.inet_aton(ip)
907 numeric = struct.unpack("!L", numeric)[0]
908 numeric = socket.htonl(numeric)
909 h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
910 return 1 + h % vpp_worker_count
913 class TestNAT44EI(MethodHolder):
914 """NAT44EI Test Cases"""
916 max_translations = 10240
921 super(TestNAT44EI, cls).setUpClass()
922 cls.vapi.cli("set log class nat44-ei level debug")
924 cls.tcp_port_in = 6303
925 cls.tcp_port_out = 6303
926 cls.udp_port_in = 6304
927 cls.udp_port_out = 6304
928 cls.icmp_id_in = 6305
929 cls.icmp_id_out = 6305
930 cls.nat_addr = "10.0.0.3"
931 cls.ipfix_src_port = 4739
932 cls.ipfix_domain_id = 1
933 cls.tcp_external_port = 80
934 cls.udp_external_port = 69
936 cls.create_pg_interfaces(range(10))
937 cls.interfaces = list(cls.pg_interfaces[0:4])
939 for i in cls.interfaces:
944 cls.pg0.generate_remote_hosts(3)
945 cls.pg0.configure_ipv4_neighbors()
947 cls.pg1.generate_remote_hosts(1)
948 cls.pg1.configure_ipv4_neighbors()
950 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
951 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
952 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
954 cls.pg4._local_ip4 = "172.16.255.1"
955 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
956 cls.pg4.set_table_ip4(10)
957 cls.pg5._local_ip4 = "172.17.255.3"
958 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
959 cls.pg5.set_table_ip4(10)
960 cls.pg6._local_ip4 = "172.16.255.1"
961 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
962 cls.pg6.set_table_ip4(20)
963 for i in cls.overlapping_interfaces:
971 cls.pg9.generate_remote_hosts(2)
973 cls.vapi.sw_interface_add_del_address(
974 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
978 cls.pg9.resolve_arp()
979 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
980 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
981 cls.pg9.resolve_arp()
983 def plugin_enable(self):
984 self.vapi.nat44_ei_plugin_enable_disable(
985 sessions=self.max_translations, users=self.max_users, enable=1
989 super(TestNAT44EI, self).setUp()
993 super(TestNAT44EI, self).tearDown()
994 if not self.vpp_dead:
995 self.vapi.nat44_ei_ipfix_enable_disable(
996 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
998 self.ipfix_src_port = 4739
999 self.ipfix_domain_id = 1
1001 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
1002 self.vapi.cli("clear logging")
1004 def test_clear_sessions(self):
1005 """NAT44EI session clearing test"""
1007 self.nat44_add_address(self.nat_addr)
1008 flags = self.config_flags.NAT44_EI_IF_INSIDE
1009 self.vapi.nat44_ei_interface_add_del_feature(
1010 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1012 self.vapi.nat44_ei_interface_add_del_feature(
1013 sw_if_index=self.pg1.sw_if_index, is_add=1
1016 pkts = self.create_stream_in(self.pg0, self.pg1)
1017 self.pg0.add_stream(pkts)
1018 self.pg_enable_capture(self.pg_interfaces)
1020 capture = self.pg1.get_capture(len(pkts))
1021 self.verify_capture_out(capture)
1023 sessions = self.statistics["/nat44-ei/total-sessions"]
1024 self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
1025 self.logger.info("sessions before clearing: %s" % sessions[0][0])
1027 self.vapi.cli("clear nat44 ei sessions")
1029 sessions = self.statistics["/nat44-ei/total-sessions"]
1030 self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
1031 self.logger.info("sessions after clearing: %s" % sessions[0][0])
1033 def test_dynamic(self):
1034 """NAT44EI dynamic translation test"""
1035 self.nat44_add_address(self.nat_addr)
1036 flags = self.config_flags.NAT44_EI_IF_INSIDE
1037 self.vapi.nat44_ei_interface_add_del_feature(
1038 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1040 self.vapi.nat44_ei_interface_add_del_feature(
1041 sw_if_index=self.pg1.sw_if_index, is_add=1
1045 tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1046 udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1047 icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1048 drops = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1050 pkts = self.create_stream_in(self.pg0, self.pg1)
1051 self.pg0.add_stream(pkts)
1052 self.pg_enable_capture(self.pg_interfaces)
1054 capture = self.pg1.get_capture(len(pkts))
1055 self.verify_capture_out(capture)
1057 if_idx = self.pg0.sw_if_index
1058 cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"]
1059 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1060 cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"]
1061 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1062 cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"]
1063 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1064 cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"]
1065 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1068 tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1069 udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1070 icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1071 drops = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1073 pkts = self.create_stream_out(self.pg1)
1074 self.pg1.add_stream(pkts)
1075 self.pg_enable_capture(self.pg_interfaces)
1077 capture = self.pg0.get_capture(len(pkts))
1078 self.verify_capture_in(capture, self.pg0)
1080 if_idx = self.pg1.sw_if_index
1081 cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"]
1082 self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1083 cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"]
1084 self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1085 cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"]
1086 self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1087 cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"]
1088 self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1090 users = self.statistics["/nat44-ei/total-users"]
1091 self.assertEqual(users[:, 0].sum(), 1)
1092 sessions = self.statistics["/nat44-ei/total-sessions"]
1093 self.assertEqual(sessions[:, 0].sum(), 3)
1095 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1096 """NAT44EI handling of client packets with TTL=1"""
1098 self.nat44_add_address(self.nat_addr)
1099 flags = self.config_flags.NAT44_EI_IF_INSIDE
1100 self.vapi.nat44_ei_interface_add_del_feature(
1101 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1103 self.vapi.nat44_ei_interface_add_del_feature(
1104 sw_if_index=self.pg1.sw_if_index, is_add=1
1107 # Client side - generate traffic
1108 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1109 capture = self.send_and_expect_some(self.pg0, pkts, self.pg0)
1111 # Client side - verify ICMP type 11 packets
1112 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1114 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1115 """NAT44EI handling of server packets with TTL=1"""
1117 self.nat44_add_address(self.nat_addr)
1118 flags = self.config_flags.NAT44_EI_IF_INSIDE
1119 self.vapi.nat44_ei_interface_add_del_feature(
1120 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1122 self.vapi.nat44_ei_interface_add_del_feature(
1123 sw_if_index=self.pg1.sw_if_index, is_add=1
1126 # Client side - create sessions
1127 pkts = self.create_stream_in(self.pg0, self.pg1)
1128 self.pg0.add_stream(pkts)
1129 self.pg_enable_capture(self.pg_interfaces)
1132 # Server side - generate traffic
1133 capture = self.pg1.get_capture(len(pkts))
1134 self.verify_capture_out(capture)
1135 pkts = self.create_stream_out(self.pg1, ttl=1)
1136 capture = self.send_and_expect_some(self.pg1, pkts, self.pg1)
1138 # Server side - verify ICMP type 11 packets
1139 self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4)
1141 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1142 """NAT44EI handling of error responses to client packets with TTL=2"""
1144 self.nat44_add_address(self.nat_addr)
1145 flags = self.config_flags.NAT44_EI_IF_INSIDE
1146 self.vapi.nat44_ei_interface_add_del_feature(
1147 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1149 self.vapi.nat44_ei_interface_add_del_feature(
1150 sw_if_index=self.pg1.sw_if_index, is_add=1
1153 # Client side - generate traffic
1154 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1155 self.pg0.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1159 # Server side - simulate ICMP type 11 response
1160 capture = self.pg1.get_capture(len(pkts))
1162 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1163 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
1166 for packet in capture
1168 self.pg1.add_stream(pkts)
1169 self.pg_enable_capture(self.pg_interfaces)
1172 # Client side - verify ICMP type 11 packets
1173 capture = self.pg0.get_capture(len(pkts))
1174 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1176 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1177 """NAT44EI handling of error responses to server packets with TTL=2"""
1179 self.nat44_add_address(self.nat_addr)
1180 flags = self.config_flags.NAT44_EI_IF_INSIDE
1181 self.vapi.nat44_ei_interface_add_del_feature(
1182 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1184 self.vapi.nat44_ei_interface_add_del_feature(
1185 sw_if_index=self.pg1.sw_if_index, is_add=1
1188 # Client side - create sessions
1189 pkts = self.create_stream_in(self.pg0, self.pg1)
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1194 # Server side - generate traffic
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_out(capture)
1197 pkts = self.create_stream_out(self.pg1, ttl=2)
1198 self.pg1.add_stream(pkts)
1199 self.pg_enable_capture(self.pg_interfaces)
1202 # Client side - simulate ICMP type 11 response
1203 capture = self.pg0.get_capture(len(pkts))
1205 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1206 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
1209 for packet in capture
1211 self.pg0.add_stream(pkts)
1212 self.pg_enable_capture(self.pg_interfaces)
1215 # Server side - verify ICMP type 11 packets
1216 capture = self.pg1.get_capture(len(pkts))
1217 self.verify_capture_out_with_icmp_errors(capture)
1219 def test_ping_out_interface_from_outside(self):
1220 """NAT44EI ping out interface from outside network"""
1222 self.nat44_add_address(self.nat_addr)
1223 flags = self.config_flags.NAT44_EI_IF_INSIDE
1224 self.vapi.nat44_ei_interface_add_del_feature(
1225 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1227 self.vapi.nat44_ei_interface_add_del_feature(
1228 sw_if_index=self.pg1.sw_if_index, is_add=1
1232 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1233 / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4)
1234 / ICMP(id=self.icmp_id_out, type="echo-request")
1237 self.pg1.add_stream(pkts)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg1.get_capture(len(pkts))
1243 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1244 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1245 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1246 self.assertEqual(packet[ICMP].type, 0) # echo reply
1249 ppp("Unexpected or invalid packet (outside network):", packet)
1253 def test_ping_internal_host_from_outside(self):
1254 """NAT44EI ping internal host from outside network"""
1256 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1257 flags = self.config_flags.NAT44_EI_IF_INSIDE
1258 self.vapi.nat44_ei_interface_add_del_feature(
1259 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1261 self.vapi.nat44_ei_interface_add_del_feature(
1262 sw_if_index=self.pg1.sw_if_index, is_add=1
1267 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
1268 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64)
1269 / ICMP(id=self.icmp_id_out, type="echo-request")
1271 self.pg1.add_stream(pkt)
1272 self.pg_enable_capture(self.pg_interfaces)
1274 capture = self.pg0.get_capture(1)
1275 self.verify_capture_in(capture, self.pg0)
1276 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1280 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
1281 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64)
1282 / ICMP(id=self.icmp_id_in, type="echo-reply")
1284 self.pg0.add_stream(pkt)
1285 self.pg_enable_capture(self.pg_interfaces)
1287 capture = self.pg1.get_capture(1)
1288 self.verify_capture_out(capture, same_port=True)
1289 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1291 def test_forwarding(self):
1292 """NAT44EI forwarding test"""
1294 flags = self.config_flags.NAT44_EI_IF_INSIDE
1295 self.vapi.nat44_ei_interface_add_del_feature(
1296 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1298 self.vapi.nat44_ei_interface_add_del_feature(
1299 sw_if_index=self.pg1.sw_if_index, is_add=1
1301 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1303 real_ip = self.pg0.remote_ip4
1304 alias_ip = self.nat_addr
1305 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1306 self.vapi.nat44_ei_add_del_static_mapping(
1308 local_ip_address=real_ip,
1309 external_ip_address=alias_ip,
1310 external_sw_if_index=0xFFFFFFFF,
1315 # static mapping match
1317 pkts = self.create_stream_out(self.pg1)
1318 self.pg1.add_stream(pkts)
1319 self.pg_enable_capture(self.pg_interfaces)
1321 capture = self.pg0.get_capture(len(pkts))
1322 self.verify_capture_in(capture, self.pg0)
1324 pkts = self.create_stream_in(self.pg0, self.pg1)
1325 self.pg0.add_stream(pkts)
1326 self.pg_enable_capture(self.pg_interfaces)
1328 capture = self.pg1.get_capture(len(pkts))
1329 self.verify_capture_out(capture, same_port=True)
1331 # no static mapping match
1333 host0 = self.pg0.remote_hosts[0]
1334 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1336 pkts = self.create_stream_out(
1337 self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True
1339 self.pg1.add_stream(pkts)
1340 self.pg_enable_capture(self.pg_interfaces)
1342 capture = self.pg0.get_capture(len(pkts))
1343 self.verify_capture_in(capture, self.pg0)
1345 pkts = self.create_stream_in(self.pg0, self.pg1)
1346 self.pg0.add_stream(pkts)
1347 self.pg_enable_capture(self.pg_interfaces)
1349 capture = self.pg1.get_capture(len(pkts))
1350 self.verify_capture_out(
1351 capture, nat_ip=self.pg0.remote_ip4, same_port=True
1354 self.pg0.remote_hosts[0] = host0
1357 self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1358 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1359 self.vapi.nat44_ei_add_del_static_mapping(
1361 local_ip_address=real_ip,
1362 external_ip_address=alias_ip,
1363 external_sw_if_index=0xFFFFFFFF,
1367 def test_static_in(self):
1368 """NAT44EI 1:1 NAT initialized from inside network"""
1370 nat_ip = "10.0.0.10"
1371 self.tcp_port_out = 6303
1372 self.udp_port_out = 6304
1373 self.icmp_id_out = 6305
1375 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376 flags = self.config_flags.NAT44_EI_IF_INSIDE
1377 self.vapi.nat44_ei_interface_add_del_feature(
1378 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1380 self.vapi.nat44_ei_interface_add_del_feature(
1381 sw_if_index=self.pg1.sw_if_index, is_add=1
1383 sm = self.vapi.nat44_ei_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual(sm[0].tag, "")
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """NAT44EI 1:1 NAT initialized from outside network"""
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 flags = self.config_flags.NAT44_EI_IF_INSIDE
1417 self.vapi.nat44_ei_interface_add_del_feature(
1418 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1420 self.vapi.nat44_ei_interface_add_del_feature(
1421 sw_if_index=self.pg1.sw_if_index, is_add=1
1423 sm = self.vapi.nat44_ei_static_mapping_dump()
1424 self.assertEqual(len(sm), 1)
1425 self.assertEqual(sm[0].tag, tag)
1428 pkts = self.create_stream_out(self.pg1, nat_ip)
1429 self.pg1.add_stream(pkts)
1430 self.pg_enable_capture(self.pg_interfaces)
1432 capture = self.pg0.get_capture(len(pkts))
1433 self.verify_capture_in(capture, self.pg0)
1436 pkts = self.create_stream_in(self.pg0, self.pg1)
1437 self.pg0.add_stream(pkts)
1438 self.pg_enable_capture(self.pg_interfaces)
1440 capture = self.pg1.get_capture(len(pkts))
1441 self.verify_capture_out(capture, nat_ip, True)
1443 def test_static_with_port_in(self):
1444 """NAT44EI 1:1 NAPT initialized from inside network"""
1446 self.tcp_port_out = 3606
1447 self.udp_port_out = 3607
1448 self.icmp_id_out = 3608
1450 self.nat44_add_address(self.nat_addr)
1451 self.nat44_add_static_mapping(
1452 self.pg0.remote_ip4,
1456 proto=IP_PROTOS.tcp,
1458 self.nat44_add_static_mapping(
1459 self.pg0.remote_ip4,
1463 proto=IP_PROTOS.udp,
1465 self.nat44_add_static_mapping(
1466 self.pg0.remote_ip4,
1470 proto=IP_PROTOS.icmp,
1472 flags = self.config_flags.NAT44_EI_IF_INSIDE
1473 self.vapi.nat44_ei_interface_add_del_feature(
1474 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1476 self.vapi.nat44_ei_interface_add_del_feature(
1477 sw_if_index=self.pg1.sw_if_index, is_add=1
1481 pkts = self.create_stream_in(self.pg0, self.pg1)
1482 self.pg0.add_stream(pkts)
1483 self.pg_enable_capture(self.pg_interfaces)
1485 capture = self.pg1.get_capture(len(pkts))
1486 self.verify_capture_out(capture)
1489 pkts = self.create_stream_out(self.pg1)
1490 self.pg1.add_stream(pkts)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg0.get_capture(len(pkts))
1494 self.verify_capture_in(capture, self.pg0)
1496 def test_static_with_port_out(self):
1497 """NAT44EI 1:1 NAPT initialized from outside network"""
1499 self.tcp_port_out = 30606
1500 self.udp_port_out = 30607
1501 self.icmp_id_out = 30608
1503 self.nat44_add_address(self.nat_addr)
1504 self.nat44_add_static_mapping(
1505 self.pg0.remote_ip4,
1509 proto=IP_PROTOS.tcp,
1511 self.nat44_add_static_mapping(
1512 self.pg0.remote_ip4,
1516 proto=IP_PROTOS.udp,
1518 self.nat44_add_static_mapping(
1519 self.pg0.remote_ip4,
1523 proto=IP_PROTOS.icmp,
1525 flags = self.config_flags.NAT44_EI_IF_INSIDE
1526 self.vapi.nat44_ei_interface_add_del_feature(
1527 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1529 self.vapi.nat44_ei_interface_add_del_feature(
1530 sw_if_index=self.pg1.sw_if_index, is_add=1
1534 pkts = self.create_stream_out(self.pg1)
1535 self.pg1.add_stream(pkts)
1536 self.pg_enable_capture(self.pg_interfaces)
1538 capture = self.pg0.get_capture(len(pkts))
1539 self.verify_capture_in(capture, self.pg0)
1542 pkts = self.create_stream_in(self.pg0, self.pg1)
1543 self.pg0.add_stream(pkts)
1544 self.pg_enable_capture(self.pg_interfaces)
1546 capture = self.pg1.get_capture(len(pkts))
1547 self.verify_capture_out(capture)
1549 def test_static_vrf_aware(self):
1550 """NAT44EI 1:1 NAT VRF awareness"""
1552 nat_ip1 = "10.0.0.30"
1553 nat_ip2 = "10.0.0.40"
1554 self.tcp_port_out = 6303
1555 self.udp_port_out = 6304
1556 self.icmp_id_out = 6305
1558 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10)
1559 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10)
1560 flags = self.config_flags.NAT44_EI_IF_INSIDE
1561 self.vapi.nat44_ei_interface_add_del_feature(
1562 sw_if_index=self.pg3.sw_if_index, is_add=1
1564 self.vapi.nat44_ei_interface_add_del_feature(
1565 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1567 self.vapi.nat44_ei_interface_add_del_feature(
1568 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1571 # inside interface VRF match NAT44EI static mapping VRF
1572 pkts = self.create_stream_in(self.pg4, self.pg3)
1573 self.pg4.add_stream(pkts)
1574 self.pg_enable_capture(self.pg_interfaces)
1576 capture = self.pg3.get_capture(len(pkts))
1577 self.verify_capture_out(capture, nat_ip1, True)
1579 # inside interface VRF don't match NAT44EI static mapping VRF (packets
1581 pkts = self.create_stream_in(self.pg0, self.pg3)
1582 self.pg0.add_stream(pkts)
1583 self.pg_enable_capture(self.pg_interfaces)
1585 self.pg3.assert_nothing_captured()
1587 def test_dynamic_to_static(self):
1588 """NAT44EI Switch from dynamic translation to 1:1NAT"""
1589 nat_ip = "10.0.0.10"
1590 self.tcp_port_out = 6303
1591 self.udp_port_out = 6304
1592 self.icmp_id_out = 6305
1594 self.nat44_add_address(self.nat_addr)
1595 flags = self.config_flags.NAT44_EI_IF_INSIDE
1596 self.vapi.nat44_ei_interface_add_del_feature(
1597 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1599 self.vapi.nat44_ei_interface_add_del_feature(
1600 sw_if_index=self.pg1.sw_if_index, is_add=1
1604 pkts = self.create_stream_in(self.pg0, self.pg1)
1605 self.pg0.add_stream(pkts)
1606 self.pg_enable_capture(self.pg_interfaces)
1608 capture = self.pg1.get_capture(len(pkts))
1609 self.verify_capture_out(capture)
1612 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1613 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1614 self.assertEqual(len(sessions), 0)
1615 pkts = self.create_stream_in(self.pg0, self.pg1)
1616 self.pg0.add_stream(pkts)
1617 self.pg_enable_capture(self.pg_interfaces)
1619 capture = self.pg1.get_capture(len(pkts))
1620 self.verify_capture_out(capture, nat_ip, True)
1622 def test_identity_nat(self):
1623 """NAT44EI Identity NAT"""
1624 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1625 self.vapi.nat44_ei_add_del_identity_mapping(
1626 ip_address=self.pg0.remote_ip4,
1627 sw_if_index=0xFFFFFFFF,
1631 flags = self.config_flags.NAT44_EI_IF_INSIDE
1632 self.vapi.nat44_ei_interface_add_del_feature(
1633 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1635 self.vapi.nat44_ei_interface_add_del_feature(
1636 sw_if_index=self.pg1.sw_if_index, is_add=1
1640 Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
1641 / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4)
1642 / TCP(sport=12345, dport=56789)
1644 self.pg1.add_stream(p)
1645 self.pg_enable_capture(self.pg_interfaces)
1647 capture = self.pg0.get_capture(1)
1652 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1653 self.assertEqual(ip.src, self.pg1.remote_ip4)
1654 self.assertEqual(tcp.dport, 56789)
1655 self.assertEqual(tcp.sport, 12345)
1656 self.assert_packet_checksums_valid(p)
1658 self.logger.error(ppp("Unexpected or invalid packet:", p))
1661 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1662 self.assertEqual(len(sessions), 0)
1663 flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1664 self.vapi.nat44_ei_add_del_identity_mapping(
1665 ip_address=self.pg0.remote_ip4,
1666 sw_if_index=0xFFFFFFFF,
1671 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1672 self.assertEqual(len(identity_mappings), 2)
1674 def test_multiple_inside_interfaces(self):
1675 """NAT44EI multiple non-overlapping address space inside interfaces"""
1677 self.nat44_add_address(self.nat_addr)
1678 flags = self.config_flags.NAT44_EI_IF_INSIDE
1679 self.vapi.nat44_ei_interface_add_del_feature(
1680 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1682 self.vapi.nat44_ei_interface_add_del_feature(
1683 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
1685 self.vapi.nat44_ei_interface_add_del_feature(
1686 sw_if_index=self.pg3.sw_if_index, is_add=1
1689 # between two NAT44EI inside interfaces (no translation)
1690 pkts = self.create_stream_in(self.pg0, self.pg1)
1691 self.pg0.add_stream(pkts)
1692 self.pg_enable_capture(self.pg_interfaces)
1694 capture = self.pg1.get_capture(len(pkts))
1695 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1697 # from inside to interface without translation
1698 pkts = self.create_stream_in(self.pg0, self.pg2)
1699 self.pg0.add_stream(pkts)
1700 self.pg_enable_capture(self.pg_interfaces)
1702 capture = self.pg2.get_capture(len(pkts))
1703 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1705 # in2out 1st interface
1706 pkts = self.create_stream_in(self.pg0, self.pg3)
1707 self.pg0.add_stream(pkts)
1708 self.pg_enable_capture(self.pg_interfaces)
1710 capture = self.pg3.get_capture(len(pkts))
1711 self.verify_capture_out(capture)
1713 # out2in 1st interface
1714 pkts = self.create_stream_out(self.pg3)
1715 self.pg3.add_stream(pkts)
1716 self.pg_enable_capture(self.pg_interfaces)
1718 capture = self.pg0.get_capture(len(pkts))
1719 self.verify_capture_in(capture, self.pg0)
1721 # in2out 2nd interface
1722 pkts = self.create_stream_in(self.pg1, self.pg3)
1723 self.pg1.add_stream(pkts)
1724 self.pg_enable_capture(self.pg_interfaces)
1726 capture = self.pg3.get_capture(len(pkts))
1727 self.verify_capture_out(capture)
1729 # out2in 2nd interface
1730 pkts = self.create_stream_out(self.pg3)
1731 self.pg3.add_stream(pkts)
1732 self.pg_enable_capture(self.pg_interfaces)
1734 capture = self.pg1.get_capture(len(pkts))
1735 self.verify_capture_in(capture, self.pg1)
1737 def test_inside_overlapping_interfaces(self):
1738 """NAT44EI multiple inside interfaces with overlapping address space"""
1740 static_nat_ip = "10.0.0.10"
1741 self.nat44_add_address(self.nat_addr)
1742 flags = self.config_flags.NAT44_EI_IF_INSIDE
1743 self.vapi.nat44_ei_interface_add_del_feature(
1744 sw_if_index=self.pg3.sw_if_index, is_add=1
1746 self.vapi.nat44_ei_interface_add_del_feature(
1747 sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1
1749 self.vapi.nat44_ei_interface_add_del_feature(
1750 sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1
1752 self.vapi.nat44_ei_interface_add_del_feature(
1753 sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1
1755 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20)
1757 # between NAT44EI inside interfaces with same VRF (no translation)
1758 pkts = self.create_stream_in(self.pg4, self.pg5)
1759 self.pg4.add_stream(pkts)
1760 self.pg_enable_capture(self.pg_interfaces)
1762 capture = self.pg5.get_capture(len(pkts))
1763 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1765 # between NAT44EI inside interfaces with different VRF (hairpinning)
1767 Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac)
1768 / IP(src=self.pg4.remote_ip4, dst=static_nat_ip)
1769 / TCP(sport=1234, dport=5678)
1771 self.pg4.add_stream(p)
1772 self.pg_enable_capture(self.pg_interfaces)
1774 capture = self.pg6.get_capture(1)
1779 self.assertEqual(ip.src, self.nat_addr)
1780 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1781 self.assertNotEqual(tcp.sport, 1234)
1782 self.assertEqual(tcp.dport, 5678)
1784 self.logger.error(ppp("Unexpected or invalid packet:", p))
1787 # in2out 1st interface
1788 pkts = self.create_stream_in(self.pg4, self.pg3)
1789 self.pg4.add_stream(pkts)
1790 self.pg_enable_capture(self.pg_interfaces)
1792 capture = self.pg3.get_capture(len(pkts))
1793 self.verify_capture_out(capture)
1795 # out2in 1st interface
1796 pkts = self.create_stream_out(self.pg3)
1797 self.pg3.add_stream(pkts)
1798 self.pg_enable_capture(self.pg_interfaces)
1800 capture = self.pg4.get_capture(len(pkts))
1801 self.verify_capture_in(capture, self.pg4)
1803 # in2out 2nd interface
1804 pkts = self.create_stream_in(self.pg5, self.pg3)
1805 self.pg5.add_stream(pkts)
1806 self.pg_enable_capture(self.pg_interfaces)
1808 capture = self.pg3.get_capture(len(pkts))
1809 self.verify_capture_out(capture)
1811 # out2in 2nd interface
1812 pkts = self.create_stream_out(self.pg3)
1813 self.pg3.add_stream(pkts)
1814 self.pg_enable_capture(self.pg_interfaces)
1816 capture = self.pg5.get_capture(len(pkts))
1817 self.verify_capture_in(capture, self.pg5)
1820 addresses = self.vapi.nat44_ei_address_dump()
1821 self.assertEqual(len(addresses), 1)
1822 sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10)
1823 self.assertEqual(len(sessions), 3)
1824 for session in sessions:
1825 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1826 self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4)
1827 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1828 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1829 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1830 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1831 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1832 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1833 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1834 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1835 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1836 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1838 # in2out 3rd interface
1839 pkts = self.create_stream_in(self.pg6, self.pg3)
1840 self.pg6.add_stream(pkts)
1841 self.pg_enable_capture(self.pg_interfaces)
1843 capture = self.pg3.get_capture(len(pkts))
1844 self.verify_capture_out(capture, static_nat_ip, True)
1846 # out2in 3rd interface
1847 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1848 self.pg3.add_stream(pkts)
1849 self.pg_enable_capture(self.pg_interfaces)
1851 capture = self.pg6.get_capture(len(pkts))
1852 self.verify_capture_in(capture, self.pg6)
1854 # general user and session dump verifications
1855 users = self.vapi.nat44_ei_user_dump()
1856 self.assertGreaterEqual(len(users), 3)
1857 addresses = self.vapi.nat44_ei_address_dump()
1858 self.assertEqual(len(addresses), 1)
1860 sessions = self.vapi.nat44_ei_user_session_dump(
1861 user.ip_address, user.vrf_id
1863 for session in sessions:
1864 self.assertEqual(user.ip_address, session.inside_ip_address)
1865 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1867 session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp]
1871 sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10)
1872 self.assertGreaterEqual(len(sessions), 4)
1873 for session in sessions:
1874 self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1875 self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4)
1876 self.assertEqual(session.outside_ip_address, addresses[0].ip_address)
1879 sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20)
1880 self.assertGreaterEqual(len(sessions), 3)
1881 for session in sessions:
1882 self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1883 self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4)
1884 self.assertEqual(str(session.outside_ip_address), static_nat_ip)
1887 in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in]
1890 def test_hairpinning(self):
1891 """NAT44EI hairpinning - 1:1 NAPT"""
1893 host = self.pg0.remote_hosts[0]
1894 server = self.pg0.remote_hosts[1]
1897 server_in_port = 5678
1898 server_out_port = 8765
1900 self.nat44_add_address(self.nat_addr)
1901 flags = self.config_flags.NAT44_EI_IF_INSIDE
1902 self.vapi.nat44_ei_interface_add_del_feature(
1903 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1905 self.vapi.nat44_ei_interface_add_del_feature(
1906 sw_if_index=self.pg1.sw_if_index, is_add=1
1909 # add static mapping for server
1910 self.nat44_add_static_mapping(
1915 proto=IP_PROTOS.tcp,
1918 cnt = self.statistics["/nat44-ei/hairpinning"]
1919 # send packet from host to server
1921 Ether(src=host.mac, dst=self.pg0.local_mac)
1922 / IP(src=host.ip4, dst=self.nat_addr)
1923 / TCP(sport=host_in_port, dport=server_out_port)
1925 self.pg0.add_stream(p)
1926 self.pg_enable_capture(self.pg_interfaces)
1928 capture = self.pg0.get_capture(1)
1933 self.assertEqual(ip.src, self.nat_addr)
1934 self.assertEqual(ip.dst, server.ip4)
1935 self.assertNotEqual(tcp.sport, host_in_port)
1936 self.assertEqual(tcp.dport, server_in_port)
1937 self.assert_packet_checksums_valid(p)
1938 host_out_port = tcp.sport
1940 self.logger.error(ppp("Unexpected or invalid packet:", p))
1943 after = self.statistics["/nat44-ei/hairpinning"]
1944 if_idx = self.pg0.sw_if_index
1945 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1947 # send reply from server to host
1949 Ether(src=server.mac, dst=self.pg0.local_mac)
1950 / IP(src=server.ip4, dst=self.nat_addr)
1951 / TCP(sport=server_in_port, dport=host_out_port)
1953 self.pg0.add_stream(p)
1954 self.pg_enable_capture(self.pg_interfaces)
1956 capture = self.pg0.get_capture(1)
1961 self.assertEqual(ip.src, self.nat_addr)
1962 self.assertEqual(ip.dst, host.ip4)
1963 self.assertEqual(tcp.sport, server_out_port)
1964 self.assertEqual(tcp.dport, host_in_port)
1965 self.assert_packet_checksums_valid(p)
1967 self.logger.error(ppp("Unexpected or invalid packet:", p))
1970 after = self.statistics["/nat44-ei/hairpinning"]
1971 if_idx = self.pg0.sw_if_index
1973 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1974 2 + (1 if self.vpp_worker_count > 0 else 0),
1977 def test_hairpinning2(self):
1978 """NAT44EI hairpinning - 1:1 NAT"""
1980 server1_nat_ip = "10.0.0.10"
1981 server2_nat_ip = "10.0.0.11"
1982 host = self.pg0.remote_hosts[0]
1983 server1 = self.pg0.remote_hosts[1]
1984 server2 = self.pg0.remote_hosts[2]
1985 server_tcp_port = 22
1986 server_udp_port = 20
1988 self.nat44_add_address(self.nat_addr)
1989 flags = self.config_flags.NAT44_EI_IF_INSIDE
1990 self.vapi.nat44_ei_interface_add_del_feature(
1991 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
1993 self.vapi.nat44_ei_interface_add_del_feature(
1994 sw_if_index=self.pg1.sw_if_index, is_add=1
1997 # add static mapping for servers
1998 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1999 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2004 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2005 / IP(src=host.ip4, dst=server1_nat_ip)
2006 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2010 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2011 / IP(src=host.ip4, dst=server1_nat_ip)
2012 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2016 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2017 / IP(src=host.ip4, dst=server1_nat_ip)
2018 / ICMP(id=self.icmp_id_in, type="echo-request")
2021 self.pg0.add_stream(pkts)
2022 self.pg_enable_capture(self.pg_interfaces)
2024 capture = self.pg0.get_capture(len(pkts))
2025 for packet in capture:
2027 self.assertEqual(packet[IP].src, self.nat_addr)
2028 self.assertEqual(packet[IP].dst, server1.ip4)
2029 if packet.haslayer(TCP):
2030 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2031 self.assertEqual(packet[TCP].dport, server_tcp_port)
2032 self.tcp_port_out = packet[TCP].sport
2033 self.assert_packet_checksums_valid(packet)
2034 elif packet.haslayer(UDP):
2035 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2036 self.assertEqual(packet[UDP].dport, server_udp_port)
2037 self.udp_port_out = packet[UDP].sport
2039 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2040 self.icmp_id_out = packet[ICMP].id
2042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2048 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2049 / IP(src=server1.ip4, dst=self.nat_addr)
2050 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2054 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2055 / IP(src=server1.ip4, dst=self.nat_addr)
2056 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2060 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2061 / IP(src=server1.ip4, dst=self.nat_addr)
2062 / ICMP(id=self.icmp_id_out, type="echo-reply")
2065 self.pg0.add_stream(pkts)
2066 self.pg_enable_capture(self.pg_interfaces)
2068 capture = self.pg0.get_capture(len(pkts))
2069 for packet in capture:
2071 self.assertEqual(packet[IP].src, server1_nat_ip)
2072 self.assertEqual(packet[IP].dst, host.ip4)
2073 if packet.haslayer(TCP):
2074 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2075 self.assertEqual(packet[TCP].sport, server_tcp_port)
2076 self.assert_packet_checksums_valid(packet)
2077 elif packet.haslayer(UDP):
2078 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2079 self.assertEqual(packet[UDP].sport, server_udp_port)
2081 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2083 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2086 # server2 to server1
2089 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2090 / IP(src=server2.ip4, dst=server1_nat_ip)
2091 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
2095 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2096 / IP(src=server2.ip4, dst=server1_nat_ip)
2097 / UDP(sport=self.udp_port_in, dport=server_udp_port)
2101 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2102 / IP(src=server2.ip4, dst=server1_nat_ip)
2103 / ICMP(id=self.icmp_id_in, type="echo-request")
2106 self.pg0.add_stream(pkts)
2107 self.pg_enable_capture(self.pg_interfaces)
2109 capture = self.pg0.get_capture(len(pkts))
2110 for packet in capture:
2112 self.assertEqual(packet[IP].src, server2_nat_ip)
2113 self.assertEqual(packet[IP].dst, server1.ip4)
2114 if packet.haslayer(TCP):
2115 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2116 self.assertEqual(packet[TCP].dport, server_tcp_port)
2117 self.tcp_port_out = packet[TCP].sport
2118 self.assert_packet_checksums_valid(packet)
2119 elif packet.haslayer(UDP):
2120 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2121 self.assertEqual(packet[UDP].dport, server_udp_port)
2122 self.udp_port_out = packet[UDP].sport
2124 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2125 self.icmp_id_out = packet[ICMP].id
2127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2130 # server1 to server2
2133 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2134 / IP(src=server1.ip4, dst=server2_nat_ip)
2135 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
2139 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2140 / IP(src=server1.ip4, dst=server2_nat_ip)
2141 / UDP(sport=server_udp_port, dport=self.udp_port_out)
2145 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2146 / IP(src=server1.ip4, dst=server2_nat_ip)
2147 / ICMP(id=self.icmp_id_out, type="echo-reply")
2150 self.pg0.add_stream(pkts)
2151 self.pg_enable_capture(self.pg_interfaces)
2153 capture = self.pg0.get_capture(len(pkts))
2154 for packet in capture:
2156 self.assertEqual(packet[IP].src, server1_nat_ip)
2157 self.assertEqual(packet[IP].dst, server2.ip4)
2158 if packet.haslayer(TCP):
2159 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2160 self.assertEqual(packet[TCP].sport, server_tcp_port)
2161 self.assert_packet_checksums_valid(packet)
2162 elif packet.haslayer(UDP):
2163 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2164 self.assertEqual(packet[UDP].sport, server_udp_port)
2166 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2168 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2171 def test_hairpinning_avoid_inf_loop(self):
2172 """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop"""
2174 host = self.pg0.remote_hosts[0]
2175 server = self.pg0.remote_hosts[1]
2178 server_in_port = 5678
2179 server_out_port = 8765
2181 self.nat44_add_address(self.nat_addr)
2182 flags = self.config_flags.NAT44_EI_IF_INSIDE
2183 self.vapi.nat44_ei_interface_add_del_feature(
2184 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2186 self.vapi.nat44_ei_interface_add_del_feature(
2187 sw_if_index=self.pg1.sw_if_index, is_add=1
2190 # add static mapping for server
2191 self.nat44_add_static_mapping(
2196 proto=IP_PROTOS.tcp,
2199 # add another static mapping that maps pg0.local_ip4 address to itself
2200 self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2202 # send packet from host to VPP (the packet should get dropped)
2204 Ether(src=host.mac, dst=self.pg0.local_mac)
2205 / IP(src=host.ip4, dst=self.pg0.local_ip4)
2206 / TCP(sport=host_in_port, dport=server_out_port)
2208 self.pg0.add_stream(p)
2209 self.pg_enable_capture(self.pg_interfaces)
2211 # Here VPP used to crash due to an infinite loop
2213 cnt = self.statistics["/nat44-ei/hairpinning"]
2214 # send packet from host to server
2216 Ether(src=host.mac, dst=self.pg0.local_mac)
2217 / IP(src=host.ip4, dst=self.nat_addr)
2218 / TCP(sport=host_in_port, dport=server_out_port)
2220 self.pg0.add_stream(p)
2221 self.pg_enable_capture(self.pg_interfaces)
2223 capture = self.pg0.get_capture(1)
2228 self.assertEqual(ip.src, self.nat_addr)
2229 self.assertEqual(ip.dst, server.ip4)
2230 self.assertNotEqual(tcp.sport, host_in_port)
2231 self.assertEqual(tcp.dport, server_in_port)
2232 self.assert_packet_checksums_valid(p)
2233 host_out_port = tcp.sport
2235 self.logger.error(ppp("Unexpected or invalid packet:", p))
2238 after = self.statistics["/nat44-ei/hairpinning"]
2239 if_idx = self.pg0.sw_if_index
2240 self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2242 # send reply from server to host
2244 Ether(src=server.mac, dst=self.pg0.local_mac)
2245 / IP(src=server.ip4, dst=self.nat_addr)
2246 / TCP(sport=server_in_port, dport=host_out_port)
2248 self.pg0.add_stream(p)
2249 self.pg_enable_capture(self.pg_interfaces)
2251 capture = self.pg0.get_capture(1)
2256 self.assertEqual(ip.src, self.nat_addr)
2257 self.assertEqual(ip.dst, host.ip4)
2258 self.assertEqual(tcp.sport, server_out_port)
2259 self.assertEqual(tcp.dport, host_in_port)
2260 self.assert_packet_checksums_valid(p)
2262 self.logger.error(ppp("Unexpected or invalid packet:", p))
2265 after = self.statistics["/nat44-ei/hairpinning"]
2266 if_idx = self.pg0.sw_if_index
2268 after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2269 2 + (1 if self.vpp_worker_count > 0 else 0),
2272 def test_interface_addr(self):
2273 """NAT44EI acquire addresses from interface"""
2274 self.vapi.nat44_ei_add_del_interface_addr(
2275 is_add=1, sw_if_index=self.pg7.sw_if_index
2278 # no address in NAT pool
2279 addresses = self.vapi.nat44_ei_address_dump()
2280 self.assertEqual(0, len(addresses))
2282 # configure interface address and check NAT address pool
2283 self.pg7.config_ip4()
2284 addresses = self.vapi.nat44_ei_address_dump()
2285 self.assertEqual(1, len(addresses))
2286 self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2288 # remove interface address and check NAT address pool
2289 self.pg7.unconfig_ip4()
2290 addresses = self.vapi.nat44_ei_address_dump()
2291 self.assertEqual(0, len(addresses))
2293 def test_interface_addr_static_mapping(self):
2294 """NAT44EI Static mapping with addresses from interface"""
2297 self.vapi.nat44_ei_add_del_interface_addr(
2298 is_add=1, sw_if_index=self.pg7.sw_if_index
2300 self.nat44_add_static_mapping(
2301 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag
2304 # static mappings with external interface
2305 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2306 self.assertEqual(1, len(static_mappings))
2307 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2308 self.assertEqual(static_mappings[0].tag, tag)
2310 # configure interface address and check static mappings
2311 self.pg7.config_ip4()
2312 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2313 self.assertEqual(2, len(static_mappings))
2315 for sm in static_mappings:
2316 if sm.external_sw_if_index == 0xFFFFFFFF:
2317 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2318 self.assertEqual(sm.tag, tag)
2320 self.assertTrue(resolved)
2322 # remove interface address and check static mappings
2323 self.pg7.unconfig_ip4()
2324 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2325 self.assertEqual(1, len(static_mappings))
2326 self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index)
2327 self.assertEqual(static_mappings[0].tag, tag)
2329 # configure interface address again and check static mappings
2330 self.pg7.config_ip4()
2331 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2332 self.assertEqual(2, len(static_mappings))
2334 for sm in static_mappings:
2335 if sm.external_sw_if_index == 0xFFFFFFFF:
2336 self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4)
2337 self.assertEqual(sm.tag, tag)
2339 self.assertTrue(resolved)
2341 # remove static mapping
2342 self.nat44_add_static_mapping(
2343 "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0
2345 static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2346 self.assertEqual(0, len(static_mappings))
2348 def test_interface_addr_identity_nat(self):
2349 """NAT44EI Identity NAT with addresses from interface"""
2352 self.vapi.nat44_ei_add_del_interface_addr(
2353 is_add=1, sw_if_index=self.pg7.sw_if_index
2355 self.vapi.nat44_ei_add_del_identity_mapping(
2357 sw_if_index=self.pg7.sw_if_index,
2359 protocol=IP_PROTOS.tcp,
2363 # identity mappings with external interface
2364 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2365 self.assertEqual(1, len(identity_mappings))
2366 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2368 # configure interface address and check identity mappings
2369 self.pg7.config_ip4()
2370 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2372 self.assertEqual(2, len(identity_mappings))
2373 for sm in identity_mappings:
2374 if sm.sw_if_index == 0xFFFFFFFF:
2376 str(identity_mappings[0].ip_address), self.pg7.local_ip4
2378 self.assertEqual(port, identity_mappings[0].port)
2379 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2381 self.assertTrue(resolved)
2383 # remove interface address and check identity mappings
2384 self.pg7.unconfig_ip4()
2385 identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2386 self.assertEqual(1, len(identity_mappings))
2387 self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index)
2389 def test_ipfix_nat44_sess(self):
2390 """NAT44EI IPFIX logging NAT44EI session created/deleted"""
2391 self.ipfix_domain_id = 10
2392 self.ipfix_src_port = 20202
2393 collector_port = 30303
2394 bind_layers(UDP, IPFIX, dport=30303)
2395 self.nat44_add_address(self.nat_addr)
2396 flags = self.config_flags.NAT44_EI_IF_INSIDE
2397 self.vapi.nat44_ei_interface_add_del_feature(
2398 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2400 self.vapi.nat44_ei_interface_add_del_feature(
2401 sw_if_index=self.pg1.sw_if_index, is_add=1
2403 self.vapi.set_ipfix_exporter(
2404 collector_address=self.pg3.remote_ip4,
2405 src_address=self.pg3.local_ip4,
2407 template_interval=10,
2408 collector_port=collector_port,
2410 self.vapi.nat44_ei_ipfix_enable_disable(
2411 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2414 pkts = self.create_stream_in(self.pg0, self.pg1)
2415 self.pg0.add_stream(pkts)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 capture = self.pg1.get_capture(len(pkts))
2419 self.verify_capture_out(capture)
2420 self.nat44_add_address(self.nat_addr, is_add=0)
2421 self.vapi.ipfix_flush()
2422 capture = self.pg3.get_capture(7)
2423 ipfix = IPFIXDecoder()
2424 # first load template
2426 self.assertTrue(p.haslayer(IPFIX))
2427 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2428 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2429 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2430 self.assertEqual(p[UDP].dport, collector_port)
2431 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2432 if p.haslayer(Template):
2433 ipfix.add_template(p.getlayer(Template))
2434 # verify events in data set
2436 if p.haslayer(Data):
2437 data = ipfix.decode_data_set(p.getlayer(Set))
2438 self.verify_ipfix_nat44_ses(data)
2440 def test_ipfix_addr_exhausted(self):
2441 """NAT44EI IPFIX logging NAT addresses exhausted"""
2442 flags = self.config_flags.NAT44_EI_IF_INSIDE
2443 self.vapi.nat44_ei_interface_add_del_feature(
2444 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2446 self.vapi.nat44_ei_interface_add_del_feature(
2447 sw_if_index=self.pg1.sw_if_index, is_add=1
2449 self.vapi.set_ipfix_exporter(
2450 collector_address=self.pg3.remote_ip4,
2451 src_address=self.pg3.local_ip4,
2453 template_interval=10,
2455 self.vapi.nat44_ei_ipfix_enable_disable(
2456 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2460 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
2461 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2464 self.pg0.add_stream(p)
2465 self.pg_enable_capture(self.pg_interfaces)
2467 self.pg1.assert_nothing_captured()
2468 self.vapi.ipfix_flush()
2469 capture = self.pg3.get_capture(7)
2470 ipfix = IPFIXDecoder()
2471 # first load template
2473 self.assertTrue(p.haslayer(IPFIX))
2474 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2475 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2476 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2477 self.assertEqual(p[UDP].dport, 4739)
2478 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2479 if p.haslayer(Template):
2480 ipfix.add_template(p.getlayer(Template))
2481 # verify events in data set
2483 if p.haslayer(Data):
2484 data = ipfix.decode_data_set(p.getlayer(Set))
2485 self.verify_ipfix_addr_exhausted(data)
2487 def test_ipfix_max_sessions(self):
2488 """NAT44EI IPFIX logging maximum session entries exceeded"""
2489 self.nat44_add_address(self.nat_addr)
2490 flags = self.config_flags.NAT44_EI_IF_INSIDE
2491 self.vapi.nat44_ei_interface_add_del_feature(
2492 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2494 self.vapi.nat44_ei_interface_add_del_feature(
2495 sw_if_index=self.pg1.sw_if_index, is_add=1
2498 max_sessions_per_thread = self.max_translations
2499 max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2502 for i in range(0, max_sessions):
2503 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2505 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2506 / IP(src=src, dst=self.pg1.remote_ip4)
2510 self.pg0.add_stream(pkts)
2511 self.pg_enable_capture(self.pg_interfaces)
2514 self.pg1.get_capture(max_sessions)
2515 self.vapi.set_ipfix_exporter(
2516 collector_address=self.pg3.remote_ip4,
2517 src_address=self.pg3.local_ip4,
2519 template_interval=10,
2521 self.vapi.nat44_ei_ipfix_enable_disable(
2522 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1
2526 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2527 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2530 self.pg0.add_stream(p)
2531 self.pg_enable_capture(self.pg_interfaces)
2533 self.pg1.assert_nothing_captured()
2534 self.vapi.ipfix_flush()
2535 capture = self.pg3.get_capture(7)
2536 ipfix = IPFIXDecoder()
2537 # first load template
2539 self.assertTrue(p.haslayer(IPFIX))
2540 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543 self.assertEqual(p[UDP].dport, 4739)
2544 self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
2545 if p.haslayer(Template):
2546 ipfix.add_template(p.getlayer(Template))
2547 # verify events in data set
2549 if p.haslayer(Data):
2550 data = ipfix.decode_data_set(p.getlayer(Set))
2551 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2553 def test_syslog_apmap(self):
2554 """NAT44EI syslog address and port mapping creation and deletion"""
2555 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2556 self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2557 self.nat44_add_address(self.nat_addr)
2558 flags = self.config_flags.NAT44_EI_IF_INSIDE
2559 self.vapi.nat44_ei_interface_add_del_feature(
2560 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2562 self.vapi.nat44_ei_interface_add_del_feature(
2563 sw_if_index=self.pg1.sw_if_index, is_add=1
2567 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2568 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2569 / TCP(sport=self.tcp_port_in, dport=20)
2571 self.pg0.add_stream(p)
2572 self.pg_enable_capture(self.pg_interfaces)
2574 capture = self.pg1.get_capture(1)
2575 self.tcp_port_out = capture[0][TCP].sport
2576 capture = self.pg3.get_capture(1)
2577 self.verify_syslog_apmap(capture[0][Raw].load)
2579 self.pg_enable_capture(self.pg_interfaces)
2581 self.nat44_add_address(self.nat_addr, is_add=0)
2582 capture = self.pg3.get_capture(1)
2583 self.verify_syslog_apmap(capture[0][Raw].load, False)
2585 def test_pool_addr_fib(self):
2586 """NAT44EI add pool addresses to FIB"""
2587 static_addr = "10.0.0.10"
2588 self.nat44_add_address(self.nat_addr)
2589 flags = self.config_flags.NAT44_EI_IF_INSIDE
2590 self.vapi.nat44_ei_interface_add_del_feature(
2591 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2593 self.vapi.nat44_ei_interface_add_del_feature(
2594 sw_if_index=self.pg1.sw_if_index, is_add=1
2596 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2599 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2602 psrc=self.pg1.remote_ip4,
2603 hwsrc=self.pg1.remote_mac,
2605 self.pg1.add_stream(p)
2606 self.pg_enable_capture(self.pg_interfaces)
2608 capture = self.pg1.get_capture(1)
2609 self.assertTrue(capture[0].haslayer(ARP))
2610 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2613 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2616 psrc=self.pg1.remote_ip4,
2617 hwsrc=self.pg1.remote_mac,
2619 self.pg1.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 capture = self.pg1.get_capture(1)
2623 self.assertTrue(capture[0].haslayer(ARP))
2624 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2626 # send ARP to non-NAT44EI interface
2627 p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2630 psrc=self.pg2.remote_ip4,
2631 hwsrc=self.pg2.remote_mac,
2633 self.pg2.add_stream(p)
2634 self.pg_enable_capture(self.pg_interfaces)
2636 self.pg1.assert_nothing_captured()
2638 # remove addresses and verify
2639 self.nat44_add_address(self.nat_addr, is_add=0)
2640 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0)
2642 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2645 psrc=self.pg1.remote_ip4,
2646 hwsrc=self.pg1.remote_mac,
2648 self.pg1.add_stream(p)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 self.pg1.assert_nothing_captured()
2653 p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP(
2656 psrc=self.pg1.remote_ip4,
2657 hwsrc=self.pg1.remote_mac,
2659 self.pg1.add_stream(p)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 self.pg1.assert_nothing_captured()
2664 def test_vrf_mode(self):
2665 """NAT44EI tenant VRF aware address pool mode"""
2669 nat_ip1 = "10.0.0.10"
2670 nat_ip2 = "10.0.0.11"
2672 self.pg0.unconfig_ip4()
2673 self.pg1.unconfig_ip4()
2674 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
2675 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
2676 self.pg0.set_table_ip4(vrf_id1)
2677 self.pg1.set_table_ip4(vrf_id2)
2678 self.pg0.config_ip4()
2679 self.pg1.config_ip4()
2680 self.pg0.resolve_arp()
2681 self.pg1.resolve_arp()
2683 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2684 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2685 flags = self.config_flags.NAT44_EI_IF_INSIDE
2686 self.vapi.nat44_ei_interface_add_del_feature(
2687 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2689 self.vapi.nat44_ei_interface_add_del_feature(
2690 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2692 self.vapi.nat44_ei_interface_add_del_feature(
2693 sw_if_index=self.pg2.sw_if_index, is_add=1
2698 pkts = self.create_stream_in(self.pg0, self.pg2)
2699 self.pg0.add_stream(pkts)
2700 self.pg_enable_capture(self.pg_interfaces)
2702 capture = self.pg2.get_capture(len(pkts))
2703 self.verify_capture_out(capture, nat_ip1)
2706 pkts = self.create_stream_in(self.pg1, self.pg2)
2707 self.pg1.add_stream(pkts)
2708 self.pg_enable_capture(self.pg_interfaces)
2710 capture = self.pg2.get_capture(len(pkts))
2711 self.verify_capture_out(capture, nat_ip2)
2714 self.pg0.unconfig_ip4()
2715 self.pg1.unconfig_ip4()
2716 self.pg0.set_table_ip4(0)
2717 self.pg1.set_table_ip4(0)
2718 self.pg0.config_ip4()
2719 self.pg1.config_ip4()
2720 self.pg0.resolve_arp()
2721 self.pg1.resolve_arp()
2722 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1})
2723 self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2})
2725 def test_vrf_feature_independent(self):
2726 """NAT44EI tenant VRF independent address pool mode"""
2728 nat_ip1 = "10.0.0.10"
2729 nat_ip2 = "10.0.0.11"
2731 self.nat44_add_address(nat_ip1)
2732 self.nat44_add_address(nat_ip2, vrf_id=99)
2733 flags = self.config_flags.NAT44_EI_IF_INSIDE
2734 self.vapi.nat44_ei_interface_add_del_feature(
2735 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2737 self.vapi.nat44_ei_interface_add_del_feature(
2738 sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1
2740 self.vapi.nat44_ei_interface_add_del_feature(
2741 sw_if_index=self.pg2.sw_if_index, is_add=1
2745 pkts = self.create_stream_in(self.pg0, self.pg2)
2746 self.pg0.add_stream(pkts)
2747 self.pg_enable_capture(self.pg_interfaces)
2749 capture = self.pg2.get_capture(len(pkts))
2750 self.verify_capture_out(capture, nat_ip1)
2753 pkts = self.create_stream_in(self.pg1, self.pg2)
2754 self.pg1.add_stream(pkts)
2755 self.pg_enable_capture(self.pg_interfaces)
2757 capture = self.pg2.get_capture(len(pkts))
2758 self.verify_capture_out(capture, nat_ip1)
2760 def test_dynamic_ipless_interfaces(self):
2761 """NAT44EI interfaces without configured IP address"""
2762 self.create_routes_and_neigbors()
2763 self.nat44_add_address(self.nat_addr)
2764 flags = self.config_flags.NAT44_EI_IF_INSIDE
2765 self.vapi.nat44_ei_interface_add_del_feature(
2766 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2768 self.vapi.nat44_ei_interface_add_del_feature(
2769 sw_if_index=self.pg8.sw_if_index, is_add=1
2773 pkts = self.create_stream_in(self.pg7, self.pg8)
2774 self.pg7.add_stream(pkts)
2775 self.pg_enable_capture(self.pg_interfaces)
2777 capture = self.pg8.get_capture(len(pkts))
2778 self.verify_capture_out(capture)
2781 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2782 self.pg8.add_stream(pkts)
2783 self.pg_enable_capture(self.pg_interfaces)
2785 capture = self.pg7.get_capture(len(pkts))
2786 self.verify_capture_in(capture, self.pg7)
2788 def test_static_ipless_interfaces(self):
2789 """NAT44EI interfaces without configured IP address - 1:1 NAT"""
2791 self.create_routes_and_neigbors()
2792 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2793 flags = self.config_flags.NAT44_EI_IF_INSIDE
2794 self.vapi.nat44_ei_interface_add_del_feature(
2795 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2797 self.vapi.nat44_ei_interface_add_del_feature(
2798 sw_if_index=self.pg8.sw_if_index, is_add=1
2802 pkts = self.create_stream_out(self.pg8)
2803 self.pg8.add_stream(pkts)
2804 self.pg_enable_capture(self.pg_interfaces)
2806 capture = self.pg7.get_capture(len(pkts))
2807 self.verify_capture_in(capture, self.pg7)
2810 pkts = self.create_stream_in(self.pg7, self.pg8)
2811 self.pg7.add_stream(pkts)
2812 self.pg_enable_capture(self.pg_interfaces)
2814 capture = self.pg8.get_capture(len(pkts))
2815 self.verify_capture_out(capture, self.nat_addr, True)
2817 def test_static_with_port_ipless_interfaces(self):
2818 """NAT44EI interfaces without configured IP address - 1:1 NAPT"""
2820 self.tcp_port_out = 30606
2821 self.udp_port_out = 30607
2822 self.icmp_id_out = 30608
2824 self.create_routes_and_neigbors()
2825 self.nat44_add_address(self.nat_addr)
2826 self.nat44_add_static_mapping(
2827 self.pg7.remote_ip4,
2831 proto=IP_PROTOS.tcp,
2833 self.nat44_add_static_mapping(
2834 self.pg7.remote_ip4,
2838 proto=IP_PROTOS.udp,
2840 self.nat44_add_static_mapping(
2841 self.pg7.remote_ip4,
2845 proto=IP_PROTOS.icmp,
2847 flags = self.config_flags.NAT44_EI_IF_INSIDE
2848 self.vapi.nat44_ei_interface_add_del_feature(
2849 sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1
2851 self.vapi.nat44_ei_interface_add_del_feature(
2852 sw_if_index=self.pg8.sw_if_index, is_add=1
2856 pkts = self.create_stream_out(self.pg8)
2857 self.pg8.add_stream(pkts)
2858 self.pg_enable_capture(self.pg_interfaces)
2860 capture = self.pg7.get_capture(len(pkts))
2861 self.verify_capture_in(capture, self.pg7)
2864 pkts = self.create_stream_in(self.pg7, self.pg8)
2865 self.pg7.add_stream(pkts)
2866 self.pg_enable_capture(self.pg_interfaces)
2868 capture = self.pg8.get_capture(len(pkts))
2869 self.verify_capture_out(capture)
2871 def test_static_unknown_proto(self):
2872 """NAT44EI 1:1 translate packet with unknown protocol"""
2873 nat_ip = "10.0.0.10"
2874 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2875 flags = self.config_flags.NAT44_EI_IF_INSIDE
2876 self.vapi.nat44_ei_interface_add_del_feature(
2877 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2879 self.vapi.nat44_ei_interface_add_del_feature(
2880 sw_if_index=self.pg1.sw_if_index, is_add=1
2885 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
2886 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
2888 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2889 / TCP(sport=1234, dport=1234)
2891 self.pg0.add_stream(p)
2892 self.pg_enable_capture(self.pg_interfaces)
2894 p = self.pg1.get_capture(1)
2897 self.assertEqual(packet[IP].src, nat_ip)
2898 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2899 self.assertEqual(packet.haslayer(GRE), 1)
2900 self.assert_packet_checksums_valid(packet)
2902 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2907 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
2908 / IP(src=self.pg1.remote_ip4, dst=nat_ip)
2910 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2911 / TCP(sport=1234, dport=1234)
2913 self.pg1.add_stream(p)
2914 self.pg_enable_capture(self.pg_interfaces)
2916 p = self.pg0.get_capture(1)
2919 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2920 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2921 self.assertEqual(packet.haslayer(GRE), 1)
2922 self.assert_packet_checksums_valid(packet)
2924 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2927 def test_hairpinning_static_unknown_proto(self):
2928 """NAT44EI 1:1 translate packet with unknown protocol - hairpinning"""
2930 host = self.pg0.remote_hosts[0]
2931 server = self.pg0.remote_hosts[1]
2933 host_nat_ip = "10.0.0.10"
2934 server_nat_ip = "10.0.0.11"
2936 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2937 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2938 flags = self.config_flags.NAT44_EI_IF_INSIDE
2939 self.vapi.nat44_ei_interface_add_del_feature(
2940 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
2942 self.vapi.nat44_ei_interface_add_del_feature(
2943 sw_if_index=self.pg1.sw_if_index, is_add=1
2948 Ether(dst=self.pg0.local_mac, src=host.mac)
2949 / IP(src=host.ip4, dst=server_nat_ip)
2951 / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4)
2952 / TCP(sport=1234, dport=1234)
2954 self.pg0.add_stream(p)
2955 self.pg_enable_capture(self.pg_interfaces)
2957 p = self.pg0.get_capture(1)
2960 self.assertEqual(packet[IP].src, host_nat_ip)
2961 self.assertEqual(packet[IP].dst, server.ip4)
2962 self.assertEqual(packet.haslayer(GRE), 1)
2963 self.assert_packet_checksums_valid(packet)
2965 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970 Ether(dst=self.pg0.local_mac, src=server.mac)
2971 / IP(src=server.ip4, dst=host_nat_ip)
2973 / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4)
2974 / TCP(sport=1234, dport=1234)
2976 self.pg0.add_stream(p)
2977 self.pg_enable_capture(self.pg_interfaces)
2979 p = self.pg0.get_capture(1)
2982 self.assertEqual(packet[IP].src, server_nat_ip)
2983 self.assertEqual(packet[IP].dst, host.ip4)
2984 self.assertEqual(packet.haslayer(GRE), 1)
2985 self.assert_packet_checksums_valid(packet)
2987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2990 def test_output_feature(self):
2991 """NAT44EI output feature (in2out postrouting)"""
2992 self.nat44_add_address(self.nat_addr)
2993 self.vapi.nat44_ei_add_del_output_interface(
2994 sw_if_index=self.pg3.sw_if_index, is_add=1
2998 pkts = self.create_stream_in(self.pg0, self.pg3)
2999 self.pg0.add_stream(pkts)
3000 self.pg_enable_capture(self.pg_interfaces)
3002 capture = self.pg3.get_capture(len(pkts))
3003 self.verify_capture_out(capture)
3006 pkts = self.create_stream_out(self.pg3)
3007 self.pg3.add_stream(pkts)
3008 self.pg_enable_capture(self.pg_interfaces)
3010 capture = self.pg0.get_capture(len(pkts))
3011 self.verify_capture_in(capture, self.pg0)
3013 # from non-NAT interface to NAT inside interface
3014 pkts = self.create_stream_in(self.pg2, self.pg0)
3015 self.pg2.add_stream(pkts)
3016 self.pg_enable_capture(self.pg_interfaces)
3018 capture = self.pg0.get_capture(len(pkts))
3019 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3021 def test_output_feature_vrf_aware(self):
3022 """NAT44EI output feature VRF aware (in2out postrouting)"""
3023 nat_ip_vrf10 = "10.0.0.10"
3024 nat_ip_vrf20 = "10.0.0.20"
3028 self.pg3.remote_ip4,
3030 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3035 self.pg3.remote_ip4,
3037 [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)],
3043 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3044 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3045 self.vapi.nat44_ei_add_del_output_interface(
3046 sw_if_index=self.pg3.sw_if_index, is_add=1
3050 pkts = self.create_stream_in(self.pg4, self.pg3)
3051 self.pg4.add_stream(pkts)
3052 self.pg_enable_capture(self.pg_interfaces)
3054 capture = self.pg3.get_capture(len(pkts))
3055 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3058 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3059 self.pg3.add_stream(pkts)
3060 self.pg_enable_capture(self.pg_interfaces)
3062 capture = self.pg4.get_capture(len(pkts))
3063 self.verify_capture_in(capture, self.pg4)
3066 pkts = self.create_stream_in(self.pg6, self.pg3)
3067 self.pg6.add_stream(pkts)
3068 self.pg_enable_capture(self.pg_interfaces)
3070 capture = self.pg3.get_capture(len(pkts))
3071 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3074 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3075 self.pg3.add_stream(pkts)
3076 self.pg_enable_capture(self.pg_interfaces)
3078 capture = self.pg6.get_capture(len(pkts))
3079 self.verify_capture_in(capture, self.pg6)
3081 def test_output_feature_hairpinning(self):
3082 """NAT44EI output feature hairpinning (in2out postrouting)"""
3083 host = self.pg0.remote_hosts[0]
3084 server = self.pg0.remote_hosts[1]
3087 server_in_port = 5678
3088 server_out_port = 8765
3090 self.nat44_add_address(self.nat_addr)
3091 self.vapi.nat44_ei_add_del_output_interface(
3092 sw_if_index=self.pg0.sw_if_index, is_add=1
3094 self.vapi.nat44_ei_add_del_output_interface(
3095 sw_if_index=self.pg1.sw_if_index, is_add=1
3098 # add static mapping for server
3099 self.nat44_add_static_mapping(
3104 proto=IP_PROTOS.tcp,
3107 # send packet from host to server
3109 Ether(src=host.mac, dst=self.pg0.local_mac)
3110 / IP(src=host.ip4, dst=self.nat_addr)
3111 / TCP(sport=host_in_port, dport=server_out_port)
3113 self.pg0.add_stream(p)
3114 self.pg_enable_capture(self.pg_interfaces)
3116 capture = self.pg0.get_capture(1)
3121 self.assertEqual(ip.src, self.nat_addr)
3122 self.assertEqual(ip.dst, server.ip4)
3123 self.assertNotEqual(tcp.sport, host_in_port)
3124 self.assertEqual(tcp.dport, server_in_port)
3125 self.assert_packet_checksums_valid(p)
3126 host_out_port = tcp.sport
3128 self.logger.error(ppp("Unexpected or invalid packet:", p))
3131 # send reply from server to host
3133 Ether(src=server.mac, dst=self.pg0.local_mac)
3134 / IP(src=server.ip4, dst=self.nat_addr)
3135 / TCP(sport=server_in_port, dport=host_out_port)
3137 self.pg0.add_stream(p)
3138 self.pg_enable_capture(self.pg_interfaces)
3140 capture = self.pg0.get_capture(1)
3145 self.assertEqual(ip.src, self.nat_addr)
3146 self.assertEqual(ip.dst, host.ip4)
3147 self.assertEqual(tcp.sport, server_out_port)
3148 self.assertEqual(tcp.dport, host_in_port)
3149 self.assert_packet_checksums_valid(p)
3151 self.logger.error(ppp("Unexpected or invalid packet:", p))
3154 def test_one_armed_nat44(self):
3155 """NAT44EI One armed NAT"""
3156 remote_host = self.pg9.remote_hosts[0]
3157 local_host = self.pg9.remote_hosts[1]
3160 self.nat44_add_address(self.nat_addr)
3161 flags = self.config_flags.NAT44_EI_IF_INSIDE
3162 self.vapi.nat44_ei_interface_add_del_feature(
3163 sw_if_index=self.pg9.sw_if_index, is_add=1
3165 self.vapi.nat44_ei_interface_add_del_feature(
3166 sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1
3171 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3172 / IP(src=local_host.ip4, dst=remote_host.ip4)
3173 / TCP(sport=12345, dport=80)
3175 self.pg9.add_stream(p)
3176 self.pg_enable_capture(self.pg_interfaces)
3178 capture = self.pg9.get_capture(1)
3183 self.assertEqual(ip.src, self.nat_addr)
3184 self.assertEqual(ip.dst, remote_host.ip4)
3185 self.assertNotEqual(tcp.sport, 12345)
3186 external_port = tcp.sport
3187 self.assertEqual(tcp.dport, 80)
3188 self.assert_packet_checksums_valid(p)
3190 self.logger.error(ppp("Unexpected or invalid packet:", p))
3195 Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac)
3196 / IP(src=remote_host.ip4, dst=self.nat_addr)
3197 / TCP(sport=80, dport=external_port)
3199 self.pg9.add_stream(p)
3200 self.pg_enable_capture(self.pg_interfaces)
3202 capture = self.pg9.get_capture(1)
3207 self.assertEqual(ip.src, remote_host.ip4)
3208 self.assertEqual(ip.dst, local_host.ip4)
3209 self.assertEqual(tcp.sport, 80)
3210 self.assertEqual(tcp.dport, 12345)
3211 self.assert_packet_checksums_valid(p)
3213 self.logger.error(ppp("Unexpected or invalid packet:", p))
3216 if self.vpp_worker_count > 1:
3217 node = "nat44-ei-handoff-classify"
3219 node = "nat44-ei-classify"
3221 err = self.statistics.get_err_counter("/err/%s/next in2out" % node)
3222 self.assertEqual(err, 1)
3223 err = self.statistics.get_err_counter("/err/%s/next out2in" % node)
3224 self.assertEqual(err, 1)
3226 def test_del_session(self):
3227 """NAT44EI delete session"""
3228 self.nat44_add_address(self.nat_addr)
3229 flags = self.config_flags.NAT44_EI_IF_INSIDE
3230 self.vapi.nat44_ei_interface_add_del_feature(
3231 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3233 self.vapi.nat44_ei_interface_add_del_feature(
3234 sw_if_index=self.pg1.sw_if_index, is_add=1
3237 pkts = self.create_stream_in(self.pg0, self.pg1)
3238 self.pg0.add_stream(pkts)
3239 self.pg_enable_capture(self.pg_interfaces)
3241 self.pg1.get_capture(len(pkts))
3243 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3244 nsessions = len(sessions)
3246 self.vapi.nat44_ei_del_session(
3247 address=sessions[0].inside_ip_address,
3248 port=sessions[0].inside_port,
3249 protocol=sessions[0].protocol,
3250 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3253 self.vapi.nat44_ei_del_session(
3254 address=sessions[1].outside_ip_address,
3255 port=sessions[1].outside_port,
3256 protocol=sessions[1].protocol,
3259 sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3260 self.assertEqual(nsessions - len(sessions), 2)
3262 self.vapi.nat44_ei_del_session(
3263 address=sessions[0].inside_ip_address,
3264 port=sessions[0].inside_port,
3265 protocol=sessions[0].protocol,
3266 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3269 self.verify_no_nat44_user()
3271 def test_frag_in_order(self):
3272 """NAT44EI translate fragments arriving in order"""
3274 self.nat44_add_address(self.nat_addr)
3275 flags = self.config_flags.NAT44_EI_IF_INSIDE
3276 self.vapi.nat44_ei_interface_add_del_feature(
3277 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3279 self.vapi.nat44_ei_interface_add_del_feature(
3280 sw_if_index=self.pg1.sw_if_index, is_add=1
3283 self.frag_in_order(proto=IP_PROTOS.tcp)
3284 self.frag_in_order(proto=IP_PROTOS.udp)
3285 self.frag_in_order(proto=IP_PROTOS.icmp)
3287 def test_frag_forwarding(self):
3288 """NAT44EI forwarding fragment test"""
3289 self.vapi.nat44_ei_add_del_interface_addr(
3290 is_add=1, sw_if_index=self.pg1.sw_if_index
3292 flags = self.config_flags.NAT44_EI_IF_INSIDE
3293 self.vapi.nat44_ei_interface_add_del_feature(
3294 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3296 self.vapi.nat44_ei_interface_add_del_feature(
3297 sw_if_index=self.pg1.sw_if_index, is_add=1
3299 self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3301 data = b"A" * 16 + b"B" * 16 + b"C" * 3
3302 pkts = self.create_stream_frag(
3303 self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp
3305 self.pg1.add_stream(pkts)
3306 self.pg_enable_capture(self.pg_interfaces)
3308 frags = self.pg0.get_capture(len(pkts))
3309 p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4)
3310 self.assertEqual(p[UDP].sport, 4789)
3311 self.assertEqual(p[UDP].dport, 4789)
3312 self.assertEqual(data, p[Raw].load)
3314 def test_reass_hairpinning(self):
3315 """NAT44EI fragments hairpinning"""
3317 server_addr = self.pg0.remote_hosts[1].ip4
3318 host_in_port = random.randint(1025, 65535)
3319 server_in_port = random.randint(1025, 65535)
3320 server_out_port = random.randint(1025, 65535)
3322 self.nat44_add_address(self.nat_addr)
3323 flags = self.config_flags.NAT44_EI_IF_INSIDE
3324 self.vapi.nat44_ei_interface_add_del_feature(
3325 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3327 self.vapi.nat44_ei_interface_add_del_feature(
3328 sw_if_index=self.pg1.sw_if_index, is_add=1
3330 # add static mapping for server
3331 self.nat44_add_static_mapping(
3336 proto=IP_PROTOS.tcp,
3338 self.nat44_add_static_mapping(
3343 proto=IP_PROTOS.udp,
3345 self.nat44_add_static_mapping(server_addr, self.nat_addr)
3347 self.reass_hairpinning(
3352 proto=IP_PROTOS.tcp,
3354 self.reass_hairpinning(
3359 proto=IP_PROTOS.udp,
3361 self.reass_hairpinning(
3366 proto=IP_PROTOS.icmp,
3369 def test_frag_out_of_order(self):
3370 """NAT44EI translate fragments arriving out of order"""
3372 self.nat44_add_address(self.nat_addr)
3373 flags = self.config_flags.NAT44_EI_IF_INSIDE
3374 self.vapi.nat44_ei_interface_add_del_feature(
3375 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3377 self.vapi.nat44_ei_interface_add_del_feature(
3378 sw_if_index=self.pg1.sw_if_index, is_add=1
3381 self.frag_out_of_order(proto=IP_PROTOS.tcp)
3382 self.frag_out_of_order(proto=IP_PROTOS.udp)
3383 self.frag_out_of_order(proto=IP_PROTOS.icmp)
3385 def test_port_restricted(self):
3386 """NAT44EI Port restricted NAT44EI (MAP-E CE)"""
3387 self.nat44_add_address(self.nat_addr)
3388 flags = self.config_flags.NAT44_EI_IF_INSIDE
3389 self.vapi.nat44_ei_interface_add_del_feature(
3390 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3392 self.vapi.nat44_ei_interface_add_del_feature(
3393 sw_if_index=self.pg1.sw_if_index, is_add=1
3395 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3396 alg=1, psid_offset=6, psid_length=6, psid=10
3400 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3401 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3402 / TCP(sport=4567, dport=22)
3404 self.pg0.add_stream(p)
3405 self.pg_enable_capture(self.pg_interfaces)
3407 capture = self.pg1.get_capture(1)
3412 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3413 self.assertEqual(ip.src, self.nat_addr)
3414 self.assertEqual(tcp.dport, 22)
3415 self.assertNotEqual(tcp.sport, 4567)
3416 self.assertEqual((tcp.sport >> 6) & 63, 10)
3417 self.assert_packet_checksums_valid(p)
3419 self.logger.error(ppp("Unexpected or invalid packet:", p))
3422 def test_port_range(self):
3423 """NAT44EI External address port range"""
3424 self.nat44_add_address(self.nat_addr)
3425 flags = self.config_flags.NAT44_EI_IF_INSIDE
3426 self.vapi.nat44_ei_interface_add_del_feature(
3427 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3429 self.vapi.nat44_ei_interface_add_del_feature(
3430 sw_if_index=self.pg1.sw_if_index, is_add=1
3432 self.vapi.nat44_ei_set_addr_and_port_alloc_alg(
3433 alg=2, start_port=1025, end_port=1027
3437 for port in range(0, 5):
3439 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
3440 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3441 / TCP(sport=1125 + port)
3444 self.pg0.add_stream(pkts)
3445 self.pg_enable_capture(self.pg_interfaces)
3447 capture = self.pg1.get_capture(3)
3450 self.assertGreaterEqual(tcp.sport, 1025)
3451 self.assertLessEqual(tcp.sport, 1027)
3453 def test_multiple_outside_vrf(self):
3454 """NAT44EI Multiple outside VRF"""
3458 self.pg1.unconfig_ip4()
3459 self.pg2.unconfig_ip4()
3460 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1})
3461 self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2})
3462 self.pg1.set_table_ip4(vrf_id1)
3463 self.pg2.set_table_ip4(vrf_id2)
3464 self.pg1.config_ip4()
3465 self.pg2.config_ip4()
3466 self.pg1.resolve_arp()
3467 self.pg2.resolve_arp()
3469 self.nat44_add_address(self.nat_addr)
3470 flags = self.config_flags.NAT44_EI_IF_INSIDE
3471 self.vapi.nat44_ei_interface_add_del_feature(
3472 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3474 self.vapi.nat44_ei_interface_add_del_feature(
3475 sw_if_index=self.pg1.sw_if_index, is_add=1
3477 self.vapi.nat44_ei_interface_add_del_feature(
3478 sw_if_index=self.pg2.sw_if_index, is_add=1
3483 pkts = self.create_stream_in(self.pg0, self.pg1)
3484 self.pg0.add_stream(pkts)
3485 self.pg_enable_capture(self.pg_interfaces)
3487 capture = self.pg1.get_capture(len(pkts))
3488 self.verify_capture_out(capture, self.nat_addr)
3490 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3491 self.pg1.add_stream(pkts)
3492 self.pg_enable_capture(self.pg_interfaces)
3494 capture = self.pg0.get_capture(len(pkts))
3495 self.verify_capture_in(capture, self.pg0)
3497 self.tcp_port_in = 60303
3498 self.udp_port_in = 60304
3499 self.icmp_id_in = 60305
3502 pkts = self.create_stream_in(self.pg0, self.pg2)
3503 self.pg0.add_stream(pkts)
3504 self.pg_enable_capture(self.pg_interfaces)
3506 capture = self.pg2.get_capture(len(pkts))
3507 self.verify_capture_out(capture, self.nat_addr)
3509 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3510 self.pg2.add_stream(pkts)
3511 self.pg_enable_capture(self.pg_interfaces)
3513 capture = self.pg0.get_capture(len(pkts))
3514 self.verify_capture_in(capture, self.pg0)
3517 self.nat44_add_address(self.nat_addr, is_add=0)
3518 self.pg1.unconfig_ip4()
3519 self.pg2.unconfig_ip4()
3520 self.pg1.set_table_ip4(0)
3521 self.pg2.set_table_ip4(0)
3522 self.pg1.config_ip4()
3523 self.pg2.config_ip4()
3524 self.pg1.resolve_arp()
3525 self.pg2.resolve_arp()
3527 def test_mss_clamping(self):
3528 """NAT44EI TCP MSS clamping"""
3529 self.nat44_add_address(self.nat_addr)
3530 flags = self.config_flags.NAT44_EI_IF_INSIDE
3531 self.vapi.nat44_ei_interface_add_del_feature(
3532 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3534 self.vapi.nat44_ei_interface_add_del_feature(
3535 sw_if_index=self.pg1.sw_if_index, is_add=1
3539 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
3540 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
3542 sport=self.tcp_port_in,
3543 dport=self.tcp_external_port,
3545 options=[("MSS", 1400)],
3549 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3550 self.pg0.add_stream(p)
3551 self.pg_enable_capture(self.pg_interfaces)
3553 capture = self.pg1.get_capture(1)
3554 # Negotiated MSS value greater than configured - changed
3555 self.verify_mss_value(capture[0], 1000)
3557 self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3558 self.pg0.add_stream(p)
3559 self.pg_enable_capture(self.pg_interfaces)
3561 capture = self.pg1.get_capture(1)
3562 # MSS clamping disabled - negotiated MSS unchanged
3563 self.verify_mss_value(capture[0], 1400)
3565 self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3566 self.pg0.add_stream(p)
3567 self.pg_enable_capture(self.pg_interfaces)
3569 capture = self.pg1.get_capture(1)
3570 # Negotiated MSS value smaller than configured - unchanged
3571 self.verify_mss_value(capture[0], 1400)
3573 def test_ha_send(self):
3574 """NAT44EI Send HA session synchronization events (active)"""
3575 flags = self.config_flags.NAT44_EI_IF_INSIDE
3576 self.vapi.nat44_ei_interface_add_del_feature(
3577 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3579 self.vapi.nat44_ei_interface_add_del_feature(
3580 sw_if_index=self.pg1.sw_if_index, is_add=1
3582 self.nat44_add_address(self.nat_addr)
3584 self.vapi.nat44_ei_ha_set_listener(
3585 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3587 self.vapi.nat44_ei_ha_set_failover(
3588 ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10
3590 bind_layers(UDP, HANATStateSync, sport=12345)
3593 pkts = self.create_stream_in(self.pg0, self.pg1)
3594 self.pg0.add_stream(pkts)
3595 self.pg_enable_capture(self.pg_interfaces)
3597 capture = self.pg1.get_capture(len(pkts))
3598 self.verify_capture_out(capture)
3599 # active send HA events
3600 self.vapi.nat44_ei_ha_flush()
3601 stats = self.statistics["/nat44-ei/ha/add-event-send"]
3602 self.assertEqual(stats[:, 0].sum(), 3)
3603 capture = self.pg3.get_capture(1)
3605 self.assert_packet_checksums_valid(p)
3609 hanat = p[HANATStateSync]
3611 self.logger.error(ppp("Invalid packet:", p))
3614 self.assertEqual(ip.src, self.pg3.local_ip4)
3615 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3616 self.assertEqual(udp.sport, 12345)
3617 self.assertEqual(udp.dport, 12346)
3618 self.assertEqual(hanat.version, 1)
3619 # self.assertEqual(hanat.thread_index, 0)
3620 self.assertEqual(hanat.count, 3)
3621 seq = hanat.sequence_number
3622 for event in hanat.events:
3623 self.assertEqual(event.event_type, 1)
3624 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3625 self.assertEqual(event.out_addr, self.nat_addr)
3626 self.assertEqual(event.fib_index, 0)
3628 # ACK received events
3630 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3631 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3632 / UDP(sport=12346, dport=12345)
3634 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3637 self.pg3.add_stream(ack)
3639 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3640 self.assertEqual(stats[:, 0].sum(), 1)
3642 # delete one session
3643 self.pg_enable_capture(self.pg_interfaces)
3644 self.vapi.nat44_ei_del_session(
3645 address=self.pg0.remote_ip4,
3646 port=self.tcp_port_in,
3647 protocol=IP_PROTOS.tcp,
3648 flags=self.config_flags.NAT44_EI_IF_INSIDE,
3650 self.vapi.nat44_ei_ha_flush()
3651 stats = self.statistics["/nat44-ei/ha/del-event-send"]
3652 self.assertEqual(stats[:, 0].sum(), 1)
3653 capture = self.pg3.get_capture(1)
3656 hanat = p[HANATStateSync]
3658 self.logger.error(ppp("Invalid packet:", p))
3661 self.assertGreater(hanat.sequence_number, seq)
3663 # do not send ACK, active retry send HA event again
3664 self.pg_enable_capture(self.pg_interfaces)
3665 self.virtual_sleep(12)
3666 stats = self.statistics["/nat44-ei/ha/retry-count"]
3667 self.assertEqual(stats[:, 0].sum(), 3)
3668 stats = self.statistics["/nat44-ei/ha/missed-count"]
3669 self.assertEqual(stats[:, 0].sum(), 1)
3670 capture = self.pg3.get_capture(3)
3671 for packet in capture:
3672 self.assertEqual(packet, p)
3674 # session counters refresh
3675 pkts = self.create_stream_out(self.pg1)
3676 self.pg1.add_stream(pkts)
3677 self.pg_enable_capture(self.pg_interfaces)
3679 self.pg0.get_capture(2)
3680 self.vapi.nat44_ei_ha_flush()
3681 stats = self.statistics["/nat44-ei/ha/refresh-event-send"]
3682 self.assertEqual(stats[:, 0].sum(), 2)
3683 capture = self.pg3.get_capture(1)
3685 self.assert_packet_checksums_valid(p)
3689 hanat = p[HANATStateSync]
3691 self.logger.error(ppp("Invalid packet:", p))
3694 self.assertEqual(ip.src, self.pg3.local_ip4)
3695 self.assertEqual(ip.dst, self.pg3.remote_ip4)
3696 self.assertEqual(udp.sport, 12345)
3697 self.assertEqual(udp.dport, 12346)
3698 self.assertEqual(hanat.version, 1)
3699 self.assertEqual(hanat.count, 2)
3700 seq = hanat.sequence_number
3701 for event in hanat.events:
3702 self.assertEqual(event.event_type, 3)
3703 self.assertEqual(event.out_addr, self.nat_addr)
3704 self.assertEqual(event.fib_index, 0)
3705 self.assertEqual(event.total_pkts, 2)
3706 self.assertGreater(event.total_bytes, 0)
3708 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3710 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3711 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3712 / UDP(sport=12346, dport=12345)
3714 sequence_number=seq, flags="ACK", thread_index=hanat.thread_index
3717 self.pg3.add_stream(ack)
3719 stats = self.statistics["/nat44-ei/ha/ack-recv"]
3720 self.assertEqual(stats[:, 0].sum(), 2)
3722 def test_ha_recv(self):
3723 """NAT44EI Receive HA session synchronization events (passive)"""
3724 self.nat44_add_address(self.nat_addr)
3725 flags = self.config_flags.NAT44_EI_IF_INSIDE
3726 self.vapi.nat44_ei_interface_add_del_feature(
3727 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
3729 self.vapi.nat44_ei_interface_add_del_feature(
3730 sw_if_index=self.pg1.sw_if_index, is_add=1
3732 self.vapi.nat44_ei_ha_set_listener(
3733 ip_address=self.pg3.local_ip4, port=12345, path_mtu=512
3735 bind_layers(UDP, HANATStateSync, sport=12345)
3737 # this is a bit tricky - HA dictates thread index due to how it's
3738 # designed, but once we use HA to create a session, we also want
3739 # to pass a packet through said session. so the session must end
3740 # up on the correct thread from both directions - in2out (based on
3741 # IP address) and out2in (based on outside port)
3743 # first choose a thread index which is correct for IP
3744 thread_index = get_nat44_ei_in2out_worker_index(
3745 self.pg0.remote_ip4, self.vpp_worker_count
3748 # now pick a port which is correct for given thread
3749 port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count))
3750 self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3751 self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3752 if self.vpp_worker_count > 0:
3753 self.tcp_port_out += port_per_thread * (thread_index - 1)
3754 self.udp_port_out += port_per_thread * (thread_index - 1)
3756 # send HA session add events to failover/passive
3758 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3759 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3760 / UDP(sport=12346, dport=12345)
3767 in_addr=self.pg0.remote_ip4,
3768 out_addr=self.nat_addr,
3769 in_port=self.tcp_port_in,
3770 out_port=self.tcp_port_out,
3771 eh_addr=self.pg1.remote_ip4,
3772 ehn_addr=self.pg1.remote_ip4,
3773 eh_port=self.tcp_external_port,
3774 ehn_port=self.tcp_external_port,
3780 in_addr=self.pg0.remote_ip4,
3781 out_addr=self.nat_addr,
3782 in_port=self.udp_port_in,
3783 out_port=self.udp_port_out,
3784 eh_addr=self.pg1.remote_ip4,
3785 ehn_addr=self.pg1.remote_ip4,
3786 eh_port=self.udp_external_port,
3787 ehn_port=self.udp_external_port,
3791 thread_index=thread_index,
3795 self.pg3.add_stream(p)
3796 self.pg_enable_capture(self.pg_interfaces)
3799 capture = self.pg3.get_capture(1)
3802 hanat = p[HANATStateSync]
3804 self.logger.error(ppp("Invalid packet:", p))
3807 self.assertEqual(hanat.sequence_number, 1)
3808 self.assertEqual(hanat.flags, "ACK")
3809 self.assertEqual(hanat.version, 1)
3810 self.assertEqual(hanat.thread_index, thread_index)
3811 stats = self.statistics["/nat44-ei/ha/ack-send"]
3812 self.assertEqual(stats[:, 0].sum(), 1)
3813 stats = self.statistics["/nat44-ei/ha/add-event-recv"]
3814 self.assertEqual(stats[:, 0].sum(), 2)
3815 users = self.statistics["/nat44-ei/total-users"]
3816 self.assertEqual(users[:, 0].sum(), 1)
3817 sessions = self.statistics["/nat44-ei/total-sessions"]
3818 self.assertEqual(sessions[:, 0].sum(), 2)
3819 users = self.vapi.nat44_ei_user_dump()
3820 self.assertEqual(len(users), 1)
3821 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3822 # there should be 2 sessions created by HA
3823 sessions = self.vapi.nat44_ei_user_session_dump(
3824 users[0].ip_address, users[0].vrf_id
3826 self.assertEqual(len(sessions), 2)
3827 for session in sessions:
3828 self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4)
3829 self.assertEqual(str(session.outside_ip_address), self.nat_addr)
3830 self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in])
3831 self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out])
3832 self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3834 # send HA session delete event to failover/passive
3836 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3837 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3838 / UDP(sport=12346, dport=12345)
3845 in_addr=self.pg0.remote_ip4,
3846 out_addr=self.nat_addr,
3847 in_port=self.udp_port_in,
3848 out_port=self.udp_port_out,
3849 eh_addr=self.pg1.remote_ip4,
3850 ehn_addr=self.pg1.remote_ip4,
3851 eh_port=self.udp_external_port,
3852 ehn_port=self.udp_external_port,
3856 thread_index=thread_index,
3860 self.pg3.add_stream(p)
3861 self.pg_enable_capture(self.pg_interfaces)
3864 capture = self.pg3.get_capture(1)
3867 hanat = p[HANATStateSync]
3869 self.logger.error(ppp("Invalid packet:", p))
3872 self.assertEqual(hanat.sequence_number, 2)
3873 self.assertEqual(hanat.flags, "ACK")
3874 self.assertEqual(hanat.version, 1)
3875 users = self.vapi.nat44_ei_user_dump()
3876 self.assertEqual(len(users), 1)
3877 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3878 # now we should have only 1 session, 1 deleted by HA
3879 sessions = self.vapi.nat44_ei_user_session_dump(
3880 users[0].ip_address, users[0].vrf_id
3882 self.assertEqual(len(sessions), 1)
3883 stats = self.statistics["/nat44-ei/ha/del-event-recv"]
3884 self.assertEqual(stats[:, 0].sum(), 1)
3886 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3887 self.assertEqual(stats, 2)
3889 # send HA session refresh event to failover/passive
3891 Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac)
3892 / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4)
3893 / UDP(sport=12346, dport=12345)
3898 event_type="refresh",
3900 in_addr=self.pg0.remote_ip4,
3901 out_addr=self.nat_addr,
3902 in_port=self.tcp_port_in,
3903 out_port=self.tcp_port_out,
3904 eh_addr=self.pg1.remote_ip4,
3905 ehn_addr=self.pg1.remote_ip4,
3906 eh_port=self.tcp_external_port,
3907 ehn_port=self.tcp_external_port,
3913 thread_index=thread_index,
3916 self.pg3.add_stream(p)
3917 self.pg_enable_capture(self.pg_interfaces)
3920 capture = self.pg3.get_capture(1)
3923 hanat = p[HANATStateSync]
3925 self.logger.error(ppp("Invalid packet:", p))
3928 self.assertEqual(hanat.sequence_number, 3)
3929 self.assertEqual(hanat.flags, "ACK")
3930 self.assertEqual(hanat.version, 1)
3931 users = self.vapi.nat44_ei_user_dump()
3932 self.assertEqual(len(users), 1)
3933 self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4)
3934 sessions = self.vapi.nat44_ei_user_session_dump(
3935 users[0].ip_address, users[0].vrf_id
3937 self.assertEqual(len(sessions), 1)
3938 session = sessions[0]
3939 self.assertEqual(session.total_bytes, 1024)
3940 self.assertEqual(session.total_pkts, 2)
3941 stats = self.statistics["/nat44-ei/ha/refresh-event-recv"]
3942 self.assertEqual(stats[:, 0].sum(), 1)
3944 stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed")
3945 self.assertEqual(stats, 3)
3947 # send packet to test session created by HA
3949 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
3950 / IP(src=self.pg1.remote_ip4, dst=self.nat_addr)
3951 / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)
3953 self.pg1.add_stream(p)
3954 self.pg_enable_capture(self.pg_interfaces)
3956 capture = self.pg0.get_capture(1)
3962 self.logger.error(ppp("Invalid packet:", p))
3965 self.assertEqual(ip.src, self.pg1.remote_ip4)
3966 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3967 self.assertEqual(tcp.sport, self.tcp_external_port)
3968 self.assertEqual(tcp.dport, self.tcp_port_in)
3970 def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3971 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3972 self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3973 # keep plugin configuration persistent
3974 self.plugin_enable()
3975 return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3977 def test_set_frame_queue_nelts(self):
3978 """NAT44EI API test - worker handoff frame queue elements"""
3979 self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3981 def show_commands_at_teardown(self):
3982 self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3983 self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3984 self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3985 self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3986 self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3987 self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3988 self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3989 self.logger.info(self.vapi.cli("show nat44 ei ha"))
3990 self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3992 def test_outside_address_distribution(self):
3993 """Outside address distribution based on source address"""
3998 for i in range(1, x):
4000 nat_addresses.append(a)
4002 flags = self.config_flags.NAT44_EI_IF_INSIDE
4003 self.vapi.nat44_ei_interface_add_del_feature(
4004 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4006 self.vapi.nat44_ei_interface_add_del_feature(
4007 sw_if_index=self.pg1.sw_if_index, is_add=1
4010 self.vapi.nat44_ei_add_del_address_range(
4011 first_ip_address=nat_addresses[0],
4012 last_ip_address=nat_addresses[-1],
4017 self.pg0.generate_remote_hosts(x)
4021 info = self.create_packet_info(self.pg0, self.pg1)
4022 payload = self.info_to_payload(info)
4024 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4025 / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4)
4026 / UDP(sport=7000 + i, dport=8000 + i)
4032 self.pg0.add_stream(pkts)
4033 self.pg_enable_capture(self.pg_interfaces)
4035 recvd = self.pg1.get_capture(len(pkts))
4036 for p_recvd in recvd:
4037 payload_info = self.payload_to_info(p_recvd[Raw])
4038 packet_index = payload_info.index
4039 info = self._packet_infos[packet_index]
4040 self.assertTrue(info is not None)
4041 self.assertEqual(packet_index, info.index)
4043 packed = socket.inet_aton(p_sent[IP].src)
4044 numeric = struct.unpack("!L", packed)[0]
4045 numeric = socket.htonl(numeric)
4046 a = nat_addresses[(numeric - 1) % len(nat_addresses)]
4050 "Invalid packet (src IP %s translated to %s, but expected %s)"
4051 % (p_sent[IP].src, p_recvd[IP].src, a),
4054 def test_default_user_sessions(self):
4055 """NAT44EI default per-user session limit is used and reported"""
4056 nat44_ei_config = self.vapi.nat44_ei_show_running_config()
4057 # a nonzero default should be reported for user_sessions
4058 self.assertNotEqual(nat44_ei_config.user_sessions, 0)
4061 class TestNAT44Out2InDPO(MethodHolder):
4062 """NAT44EI Test Cases using out2in DPO"""
4065 def setUpClass(cls):
4066 super(TestNAT44Out2InDPO, cls).setUpClass()
4067 cls.vapi.cli("set log class nat44-ei level debug")
4069 cls.tcp_port_in = 6303
4070 cls.tcp_port_out = 6303
4071 cls.udp_port_in = 6304
4072 cls.udp_port_out = 6304
4073 cls.icmp_id_in = 6305
4074 cls.icmp_id_out = 6305
4075 cls.nat_addr = "10.0.0.3"
4076 cls.dst_ip4 = "192.168.70.1"
4078 cls.create_pg_interfaces(range(2))
4081 cls.pg0.config_ip4()
4082 cls.pg0.resolve_arp()
4085 cls.pg1.config_ip6()
4086 cls.pg1.resolve_ndp()
4092 [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)],
4098 super(TestNAT44Out2InDPO, self).setUp()
4099 flags = self.config_flags.NAT44_EI_OUT2IN_DPO
4100 self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
4103 super(TestNAT44Out2InDPO, self).tearDown()
4104 if not self.vpp_dead:
4105 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4106 self.vapi.cli("clear logging")
4108 def configure_xlat(self):
4109 self.dst_ip6_pfx = "1:2:3::"
4110 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx)
4111 self.dst_ip6_pfx_len = 96
4112 self.src_ip6_pfx = "4:5:6::"
4113 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx)
4114 self.src_ip6_pfx_len = 96
4115 self.vapi.map_add_domain(
4117 self.dst_ip6_pfx_len,
4119 self.src_ip6_pfx_len,
4124 @unittest.skip("Temporary disabled")
4125 def test_464xlat_ce(self):
4126 """Test 464XLAT CE with NAT44EI"""
4128 self.configure_xlat()
4130 flags = self.config_flags.NAT44_EI_IF_INSIDE
4131 self.vapi.nat44_ei_interface_add_del_feature(
4132 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4134 self.vapi.nat44_ei_add_del_address_range(
4135 first_ip_address=self.nat_addr_n,
4136 last_ip_address=self.nat_addr_n,
4141 out_src_ip6 = self.compose_ip6(
4142 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4144 out_dst_ip6 = self.compose_ip6(
4145 self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len
4149 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4150 self.pg0.add_stream(pkts)
4151 self.pg_enable_capture(self.pg_interfaces)
4153 capture = self.pg1.get_capture(len(pkts))
4154 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6)
4156 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4157 self.pg1.add_stream(pkts)
4158 self.pg_enable_capture(self.pg_interfaces)
4160 capture = self.pg0.get_capture(len(pkts))
4161 self.verify_capture_in(capture, self.pg0)
4163 self.vapi.nat44_ei_interface_add_del_feature(
4164 sw_if_index=self.pg0.sw_if_index, flags=flags
4166 self.vapi.nat44_ei_add_del_address_range(
4167 first_ip_address=self.nat_addr_n,
4168 last_ip_address=self.nat_addr_n,
4172 @unittest.skip("Temporary disabled")
4173 def test_464xlat_ce_no_nat(self):
4174 """Test 464XLAT CE without NAT44EI"""
4176 self.configure_xlat()
4178 out_src_ip6 = self.compose_ip6(
4179 self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len
4181 out_dst_ip6 = self.compose_ip6(
4182 self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len
4185 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4186 self.pg0.add_stream(pkts)
4187 self.pg_enable_capture(self.pg_interfaces)
4189 capture = self.pg1.get_capture(len(pkts))
4190 self.verify_capture_out_ip6(
4191 capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True
4194 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4195 self.pg1.add_stream(pkts)
4196 self.pg_enable_capture(self.pg_interfaces)
4198 capture = self.pg0.get_capture(len(pkts))
4199 self.verify_capture_in(capture, self.pg0)
4202 class TestNAT44EIMW(MethodHolder):
4203 """NAT44EI Test Cases (multiple workers)"""
4205 vpp_worker_count = 2
4206 max_translations = 10240
4210 def setUpClass(cls):
4211 super(TestNAT44EIMW, cls).setUpClass()
4212 cls.vapi.cli("set log class nat level debug")
4214 cls.tcp_port_in = 6303
4215 cls.tcp_port_out = 6303
4216 cls.udp_port_in = 6304
4217 cls.udp_port_out = 6304
4218 cls.icmp_id_in = 6305
4219 cls.icmp_id_out = 6305
4220 cls.nat_addr = "10.0.0.3"
4221 cls.ipfix_src_port = 4739
4222 cls.ipfix_domain_id = 1
4223 cls.tcp_external_port = 80
4224 cls.udp_external_port = 69
4226 cls.create_pg_interfaces(range(10))
4227 cls.interfaces = list(cls.pg_interfaces[0:4])
4229 for i in cls.interfaces:
4234 cls.pg0.generate_remote_hosts(3)
4235 cls.pg0.configure_ipv4_neighbors()
4237 cls.pg1.generate_remote_hosts(1)
4238 cls.pg1.configure_ipv4_neighbors()
4240 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
4241 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10})
4242 cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20})
4244 cls.pg4._local_ip4 = "172.16.255.1"
4245 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
4246 cls.pg4.set_table_ip4(10)
4247 cls.pg5._local_ip4 = "172.17.255.3"
4248 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
4249 cls.pg5.set_table_ip4(10)
4250 cls.pg6._local_ip4 = "172.16.255.1"
4251 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
4252 cls.pg6.set_table_ip4(20)
4253 for i in cls.overlapping_interfaces:
4261 cls.pg9.generate_remote_hosts(2)
4262 cls.pg9.config_ip4()
4263 cls.vapi.sw_interface_add_del_address(
4264 sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24"
4268 cls.pg9.resolve_arp()
4269 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
4270 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
4271 cls.pg9.resolve_arp()
4274 super(TestNAT44EIMW, self).setUp()
4275 self.vapi.nat44_ei_plugin_enable_disable(
4276 sessions=self.max_translations, users=self.max_users, enable=1
4280 super(TestNAT44EIMW, self).tearDown()
4281 if not self.vpp_dead:
4282 self.vapi.nat44_ei_ipfix_enable_disable(
4283 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0
4285 self.ipfix_src_port = 4739
4286 self.ipfix_domain_id = 1
4288 self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4289 self.vapi.cli("clear logging")
4291 def test_hairpinning(self):
4292 """NAT44EI hairpinning - 1:1 NAPT"""
4294 host = self.pg0.remote_hosts[0]
4295 server = self.pg0.remote_hosts[1]
4298 server_in_port = 5678
4299 server_out_port = 8765
4303 self.nat44_add_address(self.nat_addr)
4304 flags = self.config_flags.NAT44_EI_IF_INSIDE
4305 self.vapi.nat44_ei_interface_add_del_feature(
4306 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4308 self.vapi.nat44_ei_interface_add_del_feature(
4309 sw_if_index=self.pg1.sw_if_index, is_add=1
4312 # add static mapping for server
4313 self.nat44_add_static_mapping(
4318 proto=IP_PROTOS.tcp,
4321 cnt = self.statistics["/nat44-ei/hairpinning"]
4322 # send packet from host to server
4324 Ether(src=host.mac, dst=self.pg0.local_mac)
4325 / IP(src=host.ip4, dst=self.nat_addr)
4326 / TCP(sport=host_in_port, dport=server_out_port)
4328 self.pg0.add_stream(p)
4329 self.pg_enable_capture(self.pg_interfaces)
4331 capture = self.pg0.get_capture(1)
4336 self.assertEqual(ip.src, self.nat_addr)
4337 self.assertEqual(ip.dst, server.ip4)
4338 self.assertNotEqual(tcp.sport, host_in_port)
4339 self.assertEqual(tcp.dport, server_in_port)
4340 self.assert_packet_checksums_valid(p)
4341 host_out_port = tcp.sport
4343 self.logger.error(ppp("Unexpected or invalid packet:", p))
4346 after = self.statistics["/nat44-ei/hairpinning"]
4348 if_idx = self.pg0.sw_if_index
4349 self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4351 # send reply from server to host
4353 Ether(src=server.mac, dst=self.pg0.local_mac)
4354 / IP(src=server.ip4, dst=self.nat_addr)
4355 / TCP(sport=server_in_port, dport=host_out_port)
4357 self.pg0.add_stream(p)
4358 self.pg_enable_capture(self.pg_interfaces)
4360 capture = self.pg0.get_capture(1)
4365 self.assertEqual(ip.src, self.nat_addr)
4366 self.assertEqual(ip.dst, host.ip4)
4367 self.assertEqual(tcp.sport, server_out_port)
4368 self.assertEqual(tcp.dport, host_in_port)
4369 self.assert_packet_checksums_valid(p)
4371 self.logger.error(ppp("Unexpected or invalid packet:", p))
4374 after = self.statistics["/nat44-ei/hairpinning"]
4375 if_idx = self.pg0.sw_if_index
4376 self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4377 self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4379 def test_hairpinning2(self):
4380 """NAT44EI hairpinning - 1:1 NAT"""
4382 server1_nat_ip = "10.0.0.10"
4383 server2_nat_ip = "10.0.0.11"
4384 host = self.pg0.remote_hosts[0]
4385 server1 = self.pg0.remote_hosts[1]
4386 server2 = self.pg0.remote_hosts[2]
4387 server_tcp_port = 22
4388 server_udp_port = 20
4390 self.nat44_add_address(self.nat_addr)
4391 flags = self.config_flags.NAT44_EI_IF_INSIDE
4392 self.vapi.nat44_ei_interface_add_del_feature(
4393 sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1
4395 self.vapi.nat44_ei_interface_add_del_feature(
4396 sw_if_index=self.pg1.sw_if_index, is_add=1
4399 # add static mapping for servers
4400 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4401 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4406 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4407 / IP(src=host.ip4, dst=server1_nat_ip)
4408 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4412 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4413 / IP(src=host.ip4, dst=server1_nat_ip)
4414 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4418 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4419 / IP(src=host.ip4, dst=server1_nat_ip)
4420 / ICMP(id=self.icmp_id_in, type="echo-request")
4423 self.pg0.add_stream(pkts)
4424 self.pg_enable_capture(self.pg_interfaces)
4426 capture = self.pg0.get_capture(len(pkts))
4427 for packet in capture:
4429 self.assertEqual(packet[IP].src, self.nat_addr)
4430 self.assertEqual(packet[IP].dst, server1.ip4)
4431 if packet.haslayer(TCP):
4432 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4433 self.assertEqual(packet[TCP].dport, server_tcp_port)
4434 self.tcp_port_out = packet[TCP].sport
4435 self.assert_packet_checksums_valid(packet)
4436 elif packet.haslayer(UDP):
4437 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4438 self.assertEqual(packet[UDP].dport, server_udp_port)
4439 self.udp_port_out = packet[UDP].sport
4441 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4442 self.icmp_id_out = packet[ICMP].id
4444 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4450 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4451 / IP(src=server1.ip4, dst=self.nat_addr)
4452 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4456 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4457 / IP(src=server1.ip4, dst=self.nat_addr)
4458 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4462 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4463 / IP(src=server1.ip4, dst=self.nat_addr)
4464 / ICMP(id=self.icmp_id_out, type="echo-reply")
4467 self.pg0.add_stream(pkts)
4468 self.pg_enable_capture(self.pg_interfaces)
4470 capture = self.pg0.get_capture(len(pkts))
4471 for packet in capture:
4473 self.assertEqual(packet[IP].src, server1_nat_ip)
4474 self.assertEqual(packet[IP].dst, host.ip4)
4475 if packet.haslayer(TCP):
4476 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4477 self.assertEqual(packet[TCP].sport, server_tcp_port)
4478 self.assert_packet_checksums_valid(packet)
4479 elif packet.haslayer(UDP):
4480 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4481 self.assertEqual(packet[UDP].sport, server_udp_port)
4483 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4485 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4488 # server2 to server1
4491 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4492 / IP(src=server2.ip4, dst=server1_nat_ip)
4493 / TCP(sport=self.tcp_port_in, dport=server_tcp_port)
4497 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4498 / IP(src=server2.ip4, dst=server1_nat_ip)
4499 / UDP(sport=self.udp_port_in, dport=server_udp_port)
4503 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4504 / IP(src=server2.ip4, dst=server1_nat_ip)
4505 / ICMP(id=self.icmp_id_in, type="echo-request")
4508 self.pg0.add_stream(pkts)
4509 self.pg_enable_capture(self.pg_interfaces)
4511 capture = self.pg0.get_capture(len(pkts))
4512 for packet in capture:
4514 self.assertEqual(packet[IP].src, server2_nat_ip)
4515 self.assertEqual(packet[IP].dst, server1.ip4)
4516 if packet.haslayer(TCP):
4517 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4518 self.assertEqual(packet[TCP].dport, server_tcp_port)
4519 self.tcp_port_out = packet[TCP].sport
4520 self.assert_packet_checksums_valid(packet)
4521 elif packet.haslayer(UDP):
4522 self.assertEqual(packet[UDP].sport, self.udp_port_in)
4523 self.assertEqual(packet[UDP].dport, server_udp_port)
4524 self.udp_port_out = packet[UDP].sport
4526 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4527 self.icmp_id_out = packet[ICMP].id
4529 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4532 # server1 to server2
4535 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4536 / IP(src=server1.ip4, dst=server2_nat_ip)
4537 / TCP(sport=server_tcp_port, dport=self.tcp_port_out)
4541 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4542 / IP(src=server1.ip4, dst=server2_nat_ip)
4543 / UDP(sport=server_udp_port, dport=self.udp_port_out)
4547 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
4548 / IP(src=server1.ip4, dst=server2_nat_ip)
4549 / ICMP(id=self.icmp_id_out, type="echo-reply")
4552 self.pg0.add_stream(pkts)
4553 self.pg_enable_capture(self.pg_interfaces)
4555 capture = self.pg0.get_capture(len(pkts))
4556 for packet in capture:
4558 self.assertEqual(packet[IP].src, server1_nat_ip)
4559 self.assertEqual(packet[IP].dst, server2.ip4)
4560 if packet.haslayer(TCP):
4561 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4562 self.assertEqual(packet[TCP].sport, server_tcp_port)
4563 self.assert_packet_checksums_valid(packet)
4564 elif packet.haslayer(UDP):
4565 self.assertEqual(packet[UDP].dport, self.udp_port_in)
4566 self.assertEqual(packet[UDP].sport, server_udp_port)
4568 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4570 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4574 if __name__ == "__main__":
4575 unittest.main(testRunner=VppTestRunner)